mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-09 21:50:37 +00:00
Compare commits
22 Commits
problame/p
...
problame/p
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4d7b504c70 | ||
|
|
bf065aabdf | ||
|
|
fe74fac276 | ||
|
|
b91ac670e1 | ||
|
|
b3195afd20 | ||
|
|
7eaa7a496b | ||
|
|
4772cd6c93 | ||
|
|
010b4d0d5c | ||
|
|
477cb3717b | ||
|
|
ea5a97e7b4 | ||
|
|
547914fe19 | ||
|
|
607b185a49 | ||
|
|
bfba5e3aca | ||
|
|
ecc7a9567b | ||
|
|
45f98dd018 | ||
|
|
bdfe27f3ac | ||
|
|
a15f9b3baa | ||
|
|
ce92638185 | ||
|
|
a3c82f19b8 | ||
|
|
8b15252f98 | ||
|
|
522aaca718 | ||
|
|
7cbb39063a |
4
.github/workflows/build_and_test.yml
vendored
4
.github/workflows/build_and_test.yml
vendored
@@ -1092,8 +1092,10 @@ jobs:
|
||||
run: |
|
||||
if [[ "$GITHUB_REF_NAME" == "main" ]]; then
|
||||
gh workflow --repo neondatabase/aws run deploy-dev.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}} -f deployPreprodRegion=false
|
||||
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
|
||||
|
||||
# TODO: move deployPreprodRegion to release (`"$GITHUB_REF_NAME" == "release"` block), once Staging support different compute tag prefixes for different regions
|
||||
gh workflow --repo neondatabase/aws run deploy-dev.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}} -f deployPreprodRegion=true
|
||||
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
|
||||
gh workflow --repo neondatabase/aws run deploy-prod.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}} -f disclamerAcknowledged=true
|
||||
else
|
||||
echo "GITHUB_REF_NAME (value '$GITHUB_REF_NAME') is not set to either 'main' or 'release'"
|
||||
|
||||
@@ -368,8 +368,8 @@ RUN wget https://github.com/citusdata/postgresql-hll/archive/refs/tags/v2.18.tar
|
||||
FROM build-deps AS plpgsql-check-pg-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
RUN wget https://github.com/okbob/plpgsql_check/archive/refs/tags/v2.4.0.tar.gz -O plpgsql_check.tar.gz && \
|
||||
echo "9ba58387a279b35a3bfa39ee611e5684e6cddb2ba046ddb2c5190b3bd2ca254a plpgsql_check.tar.gz" | sha256sum --check && \
|
||||
RUN wget https://github.com/okbob/plpgsql_check/archive/refs/tags/v2.5.3.tar.gz -O plpgsql_check.tar.gz && \
|
||||
echo "6631ec3e7fb3769eaaf56e3dfedb829aa761abf163d13dba354b4c218508e1c0 plpgsql_check.tar.gz" | sha256sum --check && \
|
||||
mkdir plpgsql_check-src && cd plpgsql_check-src && tar xvzf ../plpgsql_check.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
|
||||
|
||||
@@ -116,6 +116,7 @@ fn main() -> Result<()> {
|
||||
"attachment_service" => handle_attachment_service(sub_args, &env),
|
||||
"safekeeper" => handle_safekeeper(sub_args, &env),
|
||||
"endpoint" => handle_endpoint(sub_args, &env),
|
||||
"mappings" => handle_mappings(sub_args, &mut env),
|
||||
"pg" => bail!("'pg' subcommand has been renamed to 'endpoint'"),
|
||||
_ => bail!("unexpected subcommand {sub_name}"),
|
||||
};
|
||||
@@ -816,6 +817,38 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_mappings(sub_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Result<()> {
|
||||
let (sub_name, sub_args) = match sub_match.subcommand() {
|
||||
Some(ep_subcommand_data) => ep_subcommand_data,
|
||||
None => bail!("no mappings subcommand provided"),
|
||||
};
|
||||
|
||||
match sub_name {
|
||||
"map" => {
|
||||
let branch_name = sub_args
|
||||
.get_one::<String>("branch-name")
|
||||
.expect("branch-name argument missing");
|
||||
|
||||
let tenant_id = sub_args
|
||||
.get_one::<String>("tenant-id")
|
||||
.map(|x| TenantId::from_str(x))
|
||||
.expect("tenant-id argument missing")
|
||||
.expect("malformed tenant-id arg");
|
||||
|
||||
let timeline_id = sub_args
|
||||
.get_one::<String>("timeline-id")
|
||||
.map(|x| TimelineId::from_str(x))
|
||||
.expect("timeline-id argument missing")
|
||||
.expect("malformed timeline-id arg");
|
||||
|
||||
env.register_branch_mapping(branch_name.to_owned(), tenant_id, timeline_id)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
other => unimplemented!("mappings subcommand {other}"),
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
|
||||
fn get_pageserver(env: &local_env::LocalEnv, args: &ArgMatches) -> Result<PageServerNode> {
|
||||
let node_id = if let Some(id_str) = args.get_one::<String>("pageserver-id") {
|
||||
@@ -1084,6 +1117,7 @@ fn cli() -> Command {
|
||||
// --id, when using a pageserver command
|
||||
let pageserver_id_arg = Arg::new("pageserver-id")
|
||||
.long("id")
|
||||
.global(true)
|
||||
.help("pageserver id")
|
||||
.required(false);
|
||||
// --pageserver-id when using a non-pageserver command
|
||||
@@ -1254,17 +1288,20 @@ fn cli() -> Command {
|
||||
Command::new("pageserver")
|
||||
.arg_required_else_help(true)
|
||||
.about("Manage pageserver")
|
||||
.arg(pageserver_id_arg)
|
||||
.subcommand(Command::new("status"))
|
||||
.arg(pageserver_id_arg.clone())
|
||||
.subcommand(Command::new("start").about("Start local pageserver")
|
||||
.arg(pageserver_id_arg.clone())
|
||||
.arg(pageserver_config_args.clone()))
|
||||
.subcommand(Command::new("stop").about("Stop local pageserver")
|
||||
.arg(pageserver_id_arg.clone())
|
||||
.arg(stop_mode_arg.clone()))
|
||||
.subcommand(Command::new("restart").about("Restart local pageserver")
|
||||
.arg(pageserver_id_arg.clone())
|
||||
.arg(pageserver_config_args.clone()))
|
||||
.subcommand(Command::new("start")
|
||||
.about("Start local pageserver")
|
||||
.arg(pageserver_config_args.clone())
|
||||
)
|
||||
.subcommand(Command::new("stop")
|
||||
.about("Stop local pageserver")
|
||||
.arg(stop_mode_arg.clone())
|
||||
)
|
||||
.subcommand(Command::new("restart")
|
||||
.about("Restart local pageserver")
|
||||
.arg(pageserver_config_args.clone())
|
||||
)
|
||||
)
|
||||
.subcommand(
|
||||
Command::new("attachment_service")
|
||||
@@ -1321,8 +1358,8 @@ fn cli() -> Command {
|
||||
.about("Start postgres.\n If the endpoint doesn't exist yet, it is created.")
|
||||
.arg(endpoint_id_arg.clone())
|
||||
.arg(tenant_id_arg.clone())
|
||||
.arg(branch_name_arg)
|
||||
.arg(timeline_id_arg)
|
||||
.arg(branch_name_arg.clone())
|
||||
.arg(timeline_id_arg.clone())
|
||||
.arg(lsn_arg)
|
||||
.arg(pg_port_arg)
|
||||
.arg(http_port_arg)
|
||||
@@ -1335,7 +1372,7 @@ fn cli() -> Command {
|
||||
.subcommand(
|
||||
Command::new("stop")
|
||||
.arg(endpoint_id_arg)
|
||||
.arg(tenant_id_arg)
|
||||
.arg(tenant_id_arg.clone())
|
||||
.arg(
|
||||
Arg::new("destroy")
|
||||
.help("Also delete data directory (now optional, should be default in future)")
|
||||
@@ -1346,6 +1383,18 @@ fn cli() -> Command {
|
||||
)
|
||||
|
||||
)
|
||||
.subcommand(
|
||||
Command::new("mappings")
|
||||
.arg_required_else_help(true)
|
||||
.about("Manage neon_local branch name mappings")
|
||||
.subcommand(
|
||||
Command::new("map")
|
||||
.about("Create new mapping which cannot exist already")
|
||||
.arg(branch_name_arg.clone())
|
||||
.arg(tenant_id_arg.clone())
|
||||
.arg(timeline_id_arg.clone())
|
||||
)
|
||||
)
|
||||
// Obsolete old name for 'endpoint'. We now just print an error if it's used.
|
||||
.subcommand(
|
||||
Command::new("pg")
|
||||
|
||||
@@ -442,10 +442,20 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
|
||||
trace!("got message {:?}", msg);
|
||||
|
||||
let result = self.process_message(handler, msg, &mut query_string).await;
|
||||
self.flush().await?;
|
||||
tokio::select!(
|
||||
biased;
|
||||
_ = shutdown_watcher() => {
|
||||
// We were requested to shut down.
|
||||
tracing::info!("shutdown request received during response flush");
|
||||
return Ok(())
|
||||
},
|
||||
flush_r = self.flush() => {
|
||||
flush_r?;
|
||||
}
|
||||
);
|
||||
|
||||
match result? {
|
||||
ProcessMsgResult::Continue => {
|
||||
self.flush().await?;
|
||||
continue;
|
||||
}
|
||||
ProcessMsgResult::Break => break,
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
use hyper::{header, Body, Response, StatusCode};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::borrow::Cow;
|
||||
use std::error::Error as StdError;
|
||||
use thiserror::Error;
|
||||
use tracing::error;
|
||||
use tracing::{error, info};
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum ApiError {
|
||||
@@ -25,7 +26,7 @@ pub enum ApiError {
|
||||
PreconditionFailed(Box<str>),
|
||||
|
||||
#[error("Resource temporarily unavailable: {0}")]
|
||||
ResourceUnavailable(String),
|
||||
ResourceUnavailable(Cow<'static, str>),
|
||||
|
||||
#[error("Shutting down")]
|
||||
ShuttingDown,
|
||||
@@ -115,10 +116,12 @@ pub async fn route_error_handler(err: routerify::RouteError) -> Response<Body> {
|
||||
|
||||
pub fn api_error_handler(api_error: ApiError) -> Response<Body> {
|
||||
// Print a stack trace for Internal Server errors
|
||||
if let ApiError::InternalServerError(_) = api_error {
|
||||
error!("Error processing HTTP request: {api_error:?}");
|
||||
} else {
|
||||
error!("Error processing HTTP request: {api_error:#}");
|
||||
|
||||
match api_error {
|
||||
ApiError::ResourceUnavailable(_) => info!("Error processing HTTP request: {api_error:#}"),
|
||||
ApiError::NotFound(_) => info!("Error processing HTTP request: {api_error:#}"),
|
||||
ApiError::InternalServerError(_) => error!("Error processing HTTP request: {api_error:?}"),
|
||||
_ => error!("Error processing HTTP request: {api_error:#}"),
|
||||
}
|
||||
|
||||
api_error.into_response()
|
||||
|
||||
@@ -58,7 +58,7 @@ where
|
||||
// to get that.
|
||||
impl<T: Ord> PartialOrd for Waiter<T> {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
other.wake_num.partial_cmp(&self.wake_num)
|
||||
Some(self.cmp(other))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use remote_storage::{RemotePath, RemoteStorageConfig};
|
||||
use serde::de::IntoDeserializer;
|
||||
use std::env;
|
||||
use std::ops::Deref;
|
||||
use storage_broker::Uri;
|
||||
use utils::crashsafe::path_with_suffix_extension;
|
||||
use utils::id::ConnectionId;
|
||||
@@ -153,7 +154,7 @@ pub struct PageServerConf {
|
||||
// that during unit testing, because the current directory is global
|
||||
// to the process but different unit tests work on different
|
||||
// repositories.
|
||||
pub workdir: Utf8PathBuf,
|
||||
pub workdir: PageserverConfWorkdir,
|
||||
|
||||
pub pg_distrib_dir: Utf8PathBuf,
|
||||
|
||||
@@ -211,6 +212,10 @@ pub struct PageServerConf {
|
||||
|
||||
/// JWT token for use with the control plane API.
|
||||
pub control_plane_api_token: Option<SecretString>,
|
||||
|
||||
/// If true, pageserver will make best-effort to operate without a control plane: only
|
||||
/// for use in major incidents.
|
||||
pub control_plane_emergency_mode: bool,
|
||||
}
|
||||
|
||||
/// We do not want to store this in a PageServerConf because the latter may be logged
|
||||
@@ -237,6 +242,45 @@ impl<T> BuilderValue<T> {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Eq)]
|
||||
pub struct PageserverConfWorkdir {
|
||||
d: Utf8PathBuf,
|
||||
}
|
||||
|
||||
impl Deref for PageServerConf {
|
||||
type Target = PageserverConfWorkdir;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.workdir
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for PageserverConfWorkdir {
|
||||
type Target = Utf8Path;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.d
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRef<std::path::Path> for PageserverConfWorkdir {
|
||||
fn as_ref(&self) -> &std::path::Path {
|
||||
self.d.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for PageserverConfWorkdir {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
self.d.fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for PageserverConfWorkdir {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
self.d.fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
// needed to simplify config construction
|
||||
struct PageServerConfigBuilder {
|
||||
listen_pg_addr: BuilderValue<String>,
|
||||
@@ -253,7 +297,7 @@ struct PageServerConfigBuilder {
|
||||
page_cache_size: BuilderValue<usize>,
|
||||
max_file_descriptors: BuilderValue<usize>,
|
||||
|
||||
workdir: BuilderValue<Utf8PathBuf>,
|
||||
workdir: BuilderValue<PageserverConfWorkdir>,
|
||||
|
||||
pg_distrib_dir: BuilderValue<Utf8PathBuf>,
|
||||
|
||||
@@ -288,6 +332,7 @@ struct PageServerConfigBuilder {
|
||||
|
||||
control_plane_api: BuilderValue<Option<Url>>,
|
||||
control_plane_api_token: BuilderValue<Option<SecretString>>,
|
||||
control_plane_emergency_mode: BuilderValue<bool>,
|
||||
}
|
||||
|
||||
impl Default for PageServerConfigBuilder {
|
||||
@@ -305,7 +350,9 @@ impl Default for PageServerConfigBuilder {
|
||||
superuser: Set(DEFAULT_SUPERUSER.to_string()),
|
||||
page_cache_size: Set(DEFAULT_PAGE_CACHE_SIZE),
|
||||
max_file_descriptors: Set(DEFAULT_MAX_FILE_DESCRIPTORS),
|
||||
workdir: Set(Utf8PathBuf::new()),
|
||||
workdir: Set(PageserverConfWorkdir {
|
||||
d: Utf8PathBuf::new(),
|
||||
}),
|
||||
pg_distrib_dir: Set(Utf8PathBuf::from_path_buf(
|
||||
env::current_dir().expect("cannot access current directory"),
|
||||
)
|
||||
@@ -355,6 +402,7 @@ impl Default for PageServerConfigBuilder {
|
||||
|
||||
control_plane_api: Set(None),
|
||||
control_plane_api_token: Set(None),
|
||||
control_plane_emergency_mode: Set(false),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -393,7 +441,7 @@ impl PageServerConfigBuilder {
|
||||
}
|
||||
|
||||
pub fn workdir(&mut self, workdir: Utf8PathBuf) {
|
||||
self.workdir = BuilderValue::Set(workdir)
|
||||
self.workdir = BuilderValue::Set(PageserverConfWorkdir { d: workdir })
|
||||
}
|
||||
|
||||
pub fn pg_distrib_dir(&mut self, pg_distrib_dir: Utf8PathBuf) {
|
||||
@@ -491,6 +539,10 @@ impl PageServerConfigBuilder {
|
||||
self.control_plane_api_token = BuilderValue::Set(token)
|
||||
}
|
||||
|
||||
pub fn control_plane_emergency_mode(&mut self, enabled: bool) {
|
||||
self.control_plane_emergency_mode = BuilderValue::Set(enabled)
|
||||
}
|
||||
|
||||
pub fn build(self) -> anyhow::Result<PageServerConf> {
|
||||
let concurrent_tenant_size_logical_size_queries = self
|
||||
.concurrent_tenant_size_logical_size_queries
|
||||
@@ -582,21 +634,24 @@ impl PageServerConfigBuilder {
|
||||
control_plane_api_token: self
|
||||
.control_plane_api_token
|
||||
.ok_or(anyhow!("missing control_plane_api_token"))?,
|
||||
control_plane_emergency_mode: self
|
||||
.control_plane_emergency_mode
|
||||
.ok_or(anyhow!("missing control_plane_emergency_mode"))?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl PageServerConf {
|
||||
impl PageserverConfWorkdir {
|
||||
//
|
||||
// Repository paths, relative to workdir.
|
||||
//
|
||||
|
||||
pub fn tenants_path(&self) -> Utf8PathBuf {
|
||||
self.workdir.join(TENANTS_SEGMENT_NAME)
|
||||
self.d.join(TENANTS_SEGMENT_NAME)
|
||||
}
|
||||
|
||||
pub fn deletion_prefix(&self) -> Utf8PathBuf {
|
||||
self.workdir.join("deletion")
|
||||
self.d.join("deletion")
|
||||
}
|
||||
|
||||
pub fn deletion_list_path(&self, sequence: u64) -> Utf8PathBuf {
|
||||
@@ -679,7 +734,7 @@ impl PageServerConf {
|
||||
}
|
||||
|
||||
pub fn traces_path(&self) -> Utf8PathBuf {
|
||||
self.workdir.join("traces")
|
||||
self.d.join("traces")
|
||||
}
|
||||
|
||||
pub fn trace_path(
|
||||
@@ -703,9 +758,11 @@ impl PageServerConf {
|
||||
|
||||
/// Turns storage remote path of a file into its local path.
|
||||
pub fn local_path(&self, remote_path: &RemotePath) -> Utf8PathBuf {
|
||||
remote_path.with_base(&self.workdir)
|
||||
remote_path.with_base(&self.d)
|
||||
}
|
||||
}
|
||||
|
||||
impl PageServerConf {
|
||||
//
|
||||
// Postgres distribution paths
|
||||
//
|
||||
@@ -807,6 +864,10 @@ impl PageServerConf {
|
||||
builder.control_plane_api_token(Some(parsed.into()))
|
||||
}
|
||||
},
|
||||
"control_plane_emergency_mode" => {
|
||||
builder.control_plane_emergency_mode(parse_toml_bool(key, item)?)
|
||||
|
||||
},
|
||||
_ => bail!("unrecognized pageserver option '{key}'"),
|
||||
}
|
||||
}
|
||||
@@ -941,6 +1002,9 @@ impl PageServerConf {
|
||||
}
|
||||
|
||||
pub fn dummy_conf(repo_dir: Utf8PathBuf) -> Self {
|
||||
|
||||
let repo_dir = PageserverConfWorkdir { d: repo_dir };
|
||||
|
||||
let pg_distrib_dir = Utf8PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../pg_install");
|
||||
|
||||
PageServerConf {
|
||||
@@ -976,6 +1040,7 @@ impl PageServerConf {
|
||||
background_task_maximum_delay: Duration::ZERO,
|
||||
control_plane_api: None,
|
||||
control_plane_api_token: None,
|
||||
control_plane_emergency_mode: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1199,7 +1264,8 @@ background_task_maximum_delay = '334 s'
|
||||
defaults::DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY
|
||||
)?,
|
||||
control_plane_api: None,
|
||||
control_plane_api_token: None
|
||||
control_plane_api_token: None,
|
||||
control_plane_emergency_mode: false
|
||||
},
|
||||
"Correct defaults should be used when no config values are provided"
|
||||
);
|
||||
@@ -1255,7 +1321,8 @@ background_task_maximum_delay = '334 s'
|
||||
ondemand_download_behavior_treat_error_as_warn: false,
|
||||
background_task_maximum_delay: Duration::from_secs(334),
|
||||
control_plane_api: None,
|
||||
control_plane_api_token: None
|
||||
control_plane_api_token: None,
|
||||
control_plane_emergency_mode: false
|
||||
},
|
||||
"Should be able to parse all basic config values correctly"
|
||||
);
|
||||
@@ -1475,10 +1542,12 @@ threshold = "20m"
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn prepare_fs(tempdir: &Utf8TempDir) -> anyhow::Result<(Utf8PathBuf, Utf8PathBuf)> {
|
||||
fn prepare_fs(tempdir: &Utf8TempDir) -> anyhow::Result<(PageserverConfWorkdir, Utf8PathBuf)> {
|
||||
let tempdir_path = tempdir.path();
|
||||
|
||||
let workdir = tempdir_path.join("workdir");
|
||||
let workdir = PageserverConfWorkdir {
|
||||
d: tempdir_path.join("workdir"),
|
||||
};
|
||||
fs::create_dir_all(&workdir)?;
|
||||
|
||||
let pg_distrib_dir = tempdir_path.join("pg_distrib");
|
||||
|
||||
@@ -133,6 +133,8 @@ impl ControlPlaneGenerationsApi for ControlPlaneClient {
|
||||
node_id: self.node_id,
|
||||
};
|
||||
|
||||
fail::fail_point!("control-plane-client-re-attach");
|
||||
|
||||
let response: ReAttachResponse = self.retry_http_forever(&re_attach_path, request).await?;
|
||||
tracing::info!(
|
||||
"Received re-attach response with {} tenants",
|
||||
@@ -168,6 +170,8 @@ impl ControlPlaneGenerationsApi for ControlPlaneClient {
|
||||
.collect(),
|
||||
};
|
||||
|
||||
fail::fail_point!("control-plane-client-validate");
|
||||
|
||||
let response: ValidateResponse = self.retry_http_forever(&re_attach_path, request).await?;
|
||||
|
||||
Ok(response
|
||||
|
||||
@@ -40,7 +40,6 @@ use validator::ValidatorQueueMessage;
|
||||
|
||||
use crate::{config::PageServerConf, tenant::storage_layer::LayerFileName};
|
||||
|
||||
// TODO: adminstrative "panic button" config property to disable all deletions
|
||||
// TODO: configurable for how long to wait before executing deletions
|
||||
|
||||
/// We aggregate object deletions from many tenants in one place, for several reasons:
|
||||
@@ -186,7 +185,7 @@ where
|
||||
V: Serialize,
|
||||
I: AsRef<[u8]>,
|
||||
{
|
||||
let transformed = input.iter().map(|(k, v)| (hex::encode(k), v.clone()));
|
||||
let transformed = input.iter().map(|(k, v)| (hex::encode(k), v));
|
||||
|
||||
transformed
|
||||
.collect::<HashMap<String, &V>>()
|
||||
@@ -213,7 +212,7 @@ where
|
||||
|
||||
/// Files ending with this suffix will be ignored and erased
|
||||
/// during recovery as startup.
|
||||
const TEMP_SUFFIX: &str = ".tmp";
|
||||
const TEMP_SUFFIX: &str = "tmp";
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
@@ -325,10 +324,7 @@ impl DeletionList {
|
||||
return false;
|
||||
}
|
||||
|
||||
let timeline_entry = tenant_entry
|
||||
.timelines
|
||||
.entry(*timeline)
|
||||
.or_insert_with(Vec::new);
|
||||
let timeline_entry = tenant_entry.timelines.entry(*timeline).or_default();
|
||||
|
||||
let timeline_remote_path = remote_timeline_path(tenant, timeline);
|
||||
|
||||
|
||||
@@ -230,6 +230,7 @@ impl ListWriter {
|
||||
let list_name_pattern =
|
||||
Regex::new("(?<sequence>[a-zA-Z0-9]{16})-(?<version>[a-zA-Z0-9]{2}).list").unwrap();
|
||||
|
||||
let temp_extension = format!(".{TEMP_SUFFIX}");
|
||||
let header_path = self.conf.deletion_header_path();
|
||||
let mut seqs: Vec<u64> = Vec::new();
|
||||
while let Some(dentry) = dir.next_entry().await? {
|
||||
@@ -241,7 +242,7 @@ impl ListWriter {
|
||||
continue;
|
||||
}
|
||||
|
||||
if dentry_str.ends_with(TEMP_SUFFIX) {
|
||||
if dentry_str.ends_with(&temp_extension) {
|
||||
info!("Cleaning up temporary file {dentry_str}");
|
||||
let absolute_path =
|
||||
deletion_directory.join(dentry.file_name().to_str().expect("non-Unicode path"));
|
||||
|
||||
@@ -220,6 +220,8 @@ where
|
||||
warn!("Dropping stale deletions for tenant {tenant_id} in generation {:?}, objects may be leaked", tenant.generation);
|
||||
metrics::DELETION_QUEUE.keys_dropped.inc_by(tenant.len() as u64);
|
||||
mutated = true;
|
||||
} else {
|
||||
metrics::DELETION_QUEUE.keys_validated.inc_by(tenant.len() as u64);
|
||||
}
|
||||
this_list_valid
|
||||
});
|
||||
|
||||
@@ -93,9 +93,16 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"503":
|
||||
description: Temporarily unavailable, please retry.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ServiceUnavailableError"
|
||||
|
||||
delete:
|
||||
description: |
|
||||
Attempts to delete specified tenant. 500 and 409 errors should be retried until 404 is retrieved.
|
||||
Attempts to delete specified tenant. 500, 503 and 409 errors should be retried until 404 is retrieved.
|
||||
404 means that deletion successfully finished"
|
||||
responses:
|
||||
"400":
|
||||
@@ -134,6 +141,13 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"503":
|
||||
description: Temporarily unavailable, please retry.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ServiceUnavailableError"
|
||||
|
||||
|
||||
/v1/tenant/{tenant_id}/timeline:
|
||||
parameters:
|
||||
@@ -178,6 +192,13 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"503":
|
||||
description: Temporarily unavailable, please retry.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ServiceUnavailableError"
|
||||
|
||||
|
||||
/v1/tenant/{tenant_id}/timeline/{timeline_id}:
|
||||
parameters:
|
||||
@@ -226,6 +247,13 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"503":
|
||||
description: Temporarily unavailable, please retry.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ServiceUnavailableError"
|
||||
|
||||
delete:
|
||||
description: "Attempts to delete specified timeline. 500 and 409 errors should be retried"
|
||||
responses:
|
||||
@@ -265,13 +293,19 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/PreconditionFailedError"
|
||||
|
||||
"500":
|
||||
description: Generic operation error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"503":
|
||||
description: Temporarily unavailable, please retry.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ServiceUnavailableError"
|
||||
|
||||
|
||||
/v1/tenant/{tenant_id}/timeline/{timeline_id}/get_lsn_by_timestamp:
|
||||
parameters:
|
||||
@@ -328,6 +362,13 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"503":
|
||||
description: Temporarily unavailable, please retry.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ServiceUnavailableError"
|
||||
|
||||
/v1/tenant/{tenant_id}/timeline/{timeline_id}/do_gc:
|
||||
parameters:
|
||||
- name: tenant_id
|
||||
@@ -375,6 +416,13 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"503":
|
||||
description: Temporarily unavailable, please retry.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ServiceUnavailableError"
|
||||
|
||||
/v1/tenant/{tenant_id}/attach:
|
||||
parameters:
|
||||
- name: tenant_id
|
||||
@@ -465,6 +513,13 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"503":
|
||||
description: Temporarily unavailable, please retry.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ServiceUnavailableError"
|
||||
|
||||
|
||||
/v1/tenant/{tenant_id}/detach:
|
||||
parameters:
|
||||
@@ -518,6 +573,13 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"503":
|
||||
description: Temporarily unavailable, please retry.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ServiceUnavailableError"
|
||||
|
||||
|
||||
/v1/tenant/{tenant_id}/ignore:
|
||||
parameters:
|
||||
@@ -560,6 +622,13 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"503":
|
||||
description: Temporarily unavailable, please retry.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ServiceUnavailableError"
|
||||
|
||||
|
||||
/v1/tenant/{tenant_id}/load:
|
||||
parameters:
|
||||
@@ -604,6 +673,13 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"503":
|
||||
description: Temporarily unavailable, please retry.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ServiceUnavailableError"
|
||||
|
||||
|
||||
/v1/tenant/{tenant_id}/synthetic_size:
|
||||
parameters:
|
||||
@@ -641,6 +717,12 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"503":
|
||||
description: Temporarily unavailable, please retry.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ServiceUnavailableError"
|
||||
|
||||
/v1/tenant/{tenant_id}/size:
|
||||
parameters:
|
||||
@@ -704,6 +786,13 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"503":
|
||||
description: Temporarily unavailable, please retry.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ServiceUnavailableError"
|
||||
|
||||
|
||||
/v1/tenant/{tenant_id}/timeline/:
|
||||
parameters:
|
||||
@@ -780,6 +869,13 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"503":
|
||||
description: Temporarily unavailable, please retry.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ServiceUnavailableError"
|
||||
|
||||
/v1/tenant/:
|
||||
get:
|
||||
description: Get tenants list
|
||||
@@ -810,6 +906,13 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"503":
|
||||
description: Temporarily unavailable, please retry.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ServiceUnavailableError"
|
||||
|
||||
post:
|
||||
description: |
|
||||
Create a tenant. Returns new tenant id on success.
|
||||
@@ -860,6 +963,13 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"503":
|
||||
description: Temporarily unavailable, please retry.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ServiceUnavailableError"
|
||||
|
||||
|
||||
/v1/tenant/config:
|
||||
put:
|
||||
@@ -905,6 +1015,13 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"503":
|
||||
description: Temporarily unavailable, please retry.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ServiceUnavailableError"
|
||||
|
||||
/v1/tenant/{tenant_id}/config/:
|
||||
parameters:
|
||||
- name: tenant_id
|
||||
@@ -954,6 +1071,13 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"503":
|
||||
description: Temporarily unavailable, please retry.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ServiceUnavailableError"
|
||||
|
||||
components:
|
||||
securitySchemes:
|
||||
JWT:
|
||||
@@ -1220,6 +1344,13 @@ components:
|
||||
properties:
|
||||
msg:
|
||||
type: string
|
||||
ServiceUnavailableError:
|
||||
type: object
|
||||
required:
|
||||
- msg
|
||||
properties:
|
||||
msg:
|
||||
type: string
|
||||
NotFoundError:
|
||||
type: object
|
||||
required:
|
||||
|
||||
@@ -6,6 +6,7 @@ use std::sync::Arc;
|
||||
|
||||
use anyhow::{anyhow, Context, Result};
|
||||
use futures::TryFutureExt;
|
||||
use hyper::header::CONTENT_TYPE;
|
||||
use hyper::StatusCode;
|
||||
use hyper::{Body, Request, Response, Uri};
|
||||
use metrics::launch_timestamp::LaunchTimestamp;
|
||||
@@ -133,7 +134,7 @@ impl From<PageReconstructError> for ApiError {
|
||||
ApiError::InternalServerError(anyhow::anyhow!("request was cancelled"))
|
||||
}
|
||||
PageReconstructError::AncestorStopping(_) => {
|
||||
ApiError::ResourceUnavailable(format!("{pre}"))
|
||||
ApiError::ResourceUnavailable(format!("{pre}").into())
|
||||
}
|
||||
PageReconstructError::WalRedo(pre) => {
|
||||
ApiError::InternalServerError(anyhow::Error::new(pre))
|
||||
@@ -146,7 +147,7 @@ impl From<TenantMapInsertError> for ApiError {
|
||||
fn from(tmie: TenantMapInsertError) -> ApiError {
|
||||
match tmie {
|
||||
TenantMapInsertError::StillInitializing | TenantMapInsertError::ShuttingDown => {
|
||||
ApiError::ResourceUnavailable(format!("{tmie}"))
|
||||
ApiError::ResourceUnavailable(format!("{tmie}").into())
|
||||
}
|
||||
TenantMapInsertError::TenantAlreadyExists(id, state) => {
|
||||
ApiError::Conflict(format!("tenant {id} already exists, state: {state:?}"))
|
||||
@@ -395,6 +396,9 @@ async fn timeline_create_handler(
|
||||
format!("{err:#}")
|
||||
))
|
||||
}
|
||||
Err(e @ tenant::CreateTimelineError::AncestorNotActive) => {
|
||||
json_response(StatusCode::SERVICE_UNAVAILABLE, HttpErrorBody::from_msg(e.to_string()))
|
||||
}
|
||||
Err(tenant::CreateTimelineError::Other(err)) => Err(ApiError::InternalServerError(err)),
|
||||
}
|
||||
}
|
||||
@@ -636,7 +640,7 @@ async fn tenant_list_handler(
|
||||
.instrument(info_span!("tenant_list"))
|
||||
.await
|
||||
.map_err(|_| {
|
||||
ApiError::ResourceUnavailable("Tenant map is initializing or shutting down".to_string())
|
||||
ApiError::ResourceUnavailable("Tenant map is initializing or shutting down".into())
|
||||
})?
|
||||
.iter()
|
||||
.map(|(id, state)| TenantInfo {
|
||||
@@ -1236,6 +1240,136 @@ async fn deletion_queue_flush(
|
||||
}
|
||||
}
|
||||
|
||||
/// Try if `GetPage@Lsn` is successful, useful for manual debugging.
|
||||
async fn getpage_at_lsn_handler(
|
||||
request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
|
||||
struct Key(crate::repository::Key);
|
||||
|
||||
impl std::str::FromStr for Key {
|
||||
type Err = anyhow::Error;
|
||||
|
||||
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
|
||||
crate::repository::Key::from_hex(s).map(Key)
|
||||
}
|
||||
}
|
||||
|
||||
let key: Key = parse_query_param(&request, "key")?
|
||||
.ok_or_else(|| ApiError::BadRequest(anyhow!("missing 'key' query parameter")))?;
|
||||
let lsn: Lsn = parse_query_param(&request, "lsn")?
|
||||
.ok_or_else(|| ApiError::BadRequest(anyhow!("missing 'lsn' query parameter")))?;
|
||||
|
||||
async {
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
|
||||
|
||||
let page = timeline.get(key.0, lsn, &ctx).await?;
|
||||
|
||||
Result::<_, ApiError>::Ok(
|
||||
Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.header(CONTENT_TYPE, "application/octet-stream")
|
||||
.body(hyper::Body::from(page))
|
||||
.unwrap(),
|
||||
)
|
||||
}
|
||||
.instrument(info_span!("timeline_get", %tenant_id, %timeline_id))
|
||||
.await
|
||||
}
|
||||
|
||||
async fn timeline_collect_keyspace(
|
||||
request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
|
||||
struct Partitioning {
|
||||
keys: crate::keyspace::KeySpace,
|
||||
|
||||
at_lsn: Lsn,
|
||||
}
|
||||
|
||||
impl serde::Serialize for Partitioning {
|
||||
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
use serde::ser::SerializeMap;
|
||||
let mut map = serializer.serialize_map(Some(2))?;
|
||||
map.serialize_key("keys")?;
|
||||
map.serialize_value(&KeySpace(&self.keys))?;
|
||||
map.serialize_key("at_lsn")?;
|
||||
map.serialize_value(&WithDisplay(&self.at_lsn))?;
|
||||
map.end()
|
||||
}
|
||||
}
|
||||
|
||||
struct WithDisplay<'a, T>(&'a T);
|
||||
|
||||
impl<'a, T: std::fmt::Display> serde::Serialize for WithDisplay<'a, T> {
|
||||
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
serializer.collect_str(&self.0)
|
||||
}
|
||||
}
|
||||
|
||||
struct KeySpace<'a>(&'a crate::keyspace::KeySpace);
|
||||
|
||||
impl<'a> serde::Serialize for KeySpace<'a> {
|
||||
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
use serde::ser::SerializeSeq;
|
||||
let mut seq = serializer.serialize_seq(Some(self.0.ranges.len()))?;
|
||||
for kr in &self.0.ranges {
|
||||
seq.serialize_element(&KeyRange(kr))?;
|
||||
}
|
||||
seq.end()
|
||||
}
|
||||
}
|
||||
|
||||
struct KeyRange<'a>(&'a std::ops::Range<crate::repository::Key>);
|
||||
|
||||
impl<'a> serde::Serialize for KeyRange<'a> {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
use serde::ser::SerializeTuple;
|
||||
let mut t = serializer.serialize_tuple(2)?;
|
||||
t.serialize_element(&WithDisplay(&self.0.start))?;
|
||||
t.serialize_element(&WithDisplay(&self.0.end))?;
|
||||
t.end()
|
||||
}
|
||||
}
|
||||
|
||||
let at_lsn: Option<Lsn> = parse_query_param(&request, "at_lsn")?;
|
||||
|
||||
async {
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
|
||||
let at_lsn = at_lsn.unwrap_or_else(|| timeline.get_last_record_lsn());
|
||||
let keys = timeline
|
||||
.collect_keyspace(at_lsn, &ctx)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
json_response(StatusCode::OK, Partitioning { keys, at_lsn })
|
||||
}
|
||||
.instrument(info_span!("timeline_collect_keyspace", %tenant_id, %timeline_id))
|
||||
.await
|
||||
}
|
||||
|
||||
async fn active_timeline_of_active_tenant(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
@@ -1583,5 +1717,12 @@ pub fn make_router(
|
||||
.post("/v1/tracing/event", |r| {
|
||||
testing_api_handler("emit a tracing event", r, post_tracing_event_handler)
|
||||
})
|
||||
.get("/v1/tenant/:tenant_id/timeline/:timeline_id/getpage", |r| {
|
||||
testing_api_handler("getpage@lsn", r, getpage_at_lsn_handler)
|
||||
})
|
||||
.get(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/keyspace",
|
||||
|r| testing_api_handler("read out the keyspace", r, timeline_collect_keyspace),
|
||||
)
|
||||
.any(handler_404))
|
||||
}
|
||||
|
||||
@@ -691,10 +691,9 @@ impl StorageIoTime {
|
||||
.expect("failed to define a metric");
|
||||
let metrics = std::array::from_fn(|i| {
|
||||
let op = StorageIoOperation::from_repr(i).unwrap();
|
||||
let metric = storage_io_histogram_vec
|
||||
storage_io_histogram_vec
|
||||
.get_metric_with_label_values(&[op.as_str()])
|
||||
.unwrap();
|
||||
metric
|
||||
.unwrap()
|
||||
});
|
||||
Self { metrics }
|
||||
}
|
||||
@@ -967,6 +966,7 @@ pub(crate) struct DeletionQueueMetrics {
|
||||
pub(crate) keys_submitted: IntCounter,
|
||||
pub(crate) keys_dropped: IntCounter,
|
||||
pub(crate) keys_executed: IntCounter,
|
||||
pub(crate) keys_validated: IntCounter,
|
||||
pub(crate) dropped_lsn_updates: IntCounter,
|
||||
pub(crate) unexpected_errors: IntCounter,
|
||||
pub(crate) remote_errors: IntCounterVec,
|
||||
@@ -988,7 +988,13 @@ pub(crate) static DELETION_QUEUE: Lazy<DeletionQueueMetrics> = Lazy::new(|| {
|
||||
|
||||
keys_executed: register_int_counter!(
|
||||
"pageserver_deletion_queue_executed_total",
|
||||
"Number of objects deleted. Only includes objects that we actually deleted, sum with pageserver_deletion_queue_dropped_total for the total number of keys processed."
|
||||
"Number of objects deleted. Only includes objects that we actually deleted, sum with pageserver_deletion_queue_dropped_total for the total number of keys processed to completion"
|
||||
)
|
||||
.expect("failed to define a metric"),
|
||||
|
||||
keys_validated: register_int_counter!(
|
||||
"pageserver_deletion_queue_validated_total",
|
||||
"Number of keys validated for deletion. Sum with pageserver_deletion_queue_dropped_total for the total number of keys that have passed through the validation stage."
|
||||
)
|
||||
.expect("failed to define a metric"),
|
||||
|
||||
|
||||
@@ -66,8 +66,7 @@
|
||||
//! inserted to the mapping, but you must hold the write-lock on the slot until
|
||||
//! the contents are valid. If you need to release the lock without initializing
|
||||
//! the contents, you must remove the mapping first. We make that easy for the
|
||||
//! callers with PageWriteGuard: when lock_for_write() returns an uninitialized
|
||||
//! page, the caller must explicitly call guard.mark_valid() after it has
|
||||
//! callers with PageWriteGuard: the caller must explicitly call guard.mark_valid() after it has
|
||||
//! initialized it. If the guard is dropped without calling mark_valid(), the
|
||||
//! mapping is automatically removed and the slot is marked free.
|
||||
//!
|
||||
@@ -286,23 +285,25 @@ impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
|
||||
///
|
||||
/// Counterintuitively, this is used even for a read, if the requested page is not
|
||||
/// currently found in the page cache. In that case, the caller of lock_for_read()
|
||||
/// is expected to fill in the page contents and call mark_valid(). Similarly
|
||||
/// lock_for_write() can return an invalid buffer that the caller is expected to
|
||||
/// to initialize.
|
||||
///
|
||||
/// is expected to fill in the page contents and call mark_valid().
|
||||
pub struct PageWriteGuard<'i> {
|
||||
inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
|
||||
state: PageWriteGuardState<'i>,
|
||||
}
|
||||
|
||||
_permit: PinnedSlotsPermit,
|
||||
|
||||
// Are the page contents currently valid?
|
||||
// Used to mark pages as invalid that are assigned but not yet filled with data.
|
||||
valid: bool,
|
||||
enum PageWriteGuardState<'i> {
|
||||
Invalid {
|
||||
inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
|
||||
_permit: PinnedSlotsPermit,
|
||||
},
|
||||
Downgraded,
|
||||
}
|
||||
|
||||
impl std::ops::DerefMut for PageWriteGuard<'_> {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
self.inner.buf
|
||||
match &mut self.state {
|
||||
PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
|
||||
PageWriteGuardState::Downgraded => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -310,25 +311,37 @@ impl std::ops::Deref for PageWriteGuard<'_> {
|
||||
type Target = [u8; PAGE_SZ];
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.inner.buf
|
||||
match &self.state {
|
||||
PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
|
||||
PageWriteGuardState::Downgraded => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl AsMut<[u8; PAGE_SZ]> for PageWriteGuard<'_> {
|
||||
fn as_mut(&mut self) -> &mut [u8; PAGE_SZ] {
|
||||
self.inner.buf
|
||||
match &mut self.state {
|
||||
PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
|
||||
PageWriteGuardState::Downgraded => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PageWriteGuard<'_> {
|
||||
impl<'a> PageWriteGuard<'a> {
|
||||
/// Mark that the buffer contents are now valid.
|
||||
pub fn mark_valid(&mut self) {
|
||||
assert!(self.inner.key.is_some());
|
||||
assert!(
|
||||
!self.valid,
|
||||
"mark_valid called on a buffer that was already valid"
|
||||
);
|
||||
self.valid = true;
|
||||
#[must_use]
|
||||
pub fn mark_valid(mut self) -> PageReadGuard<'a> {
|
||||
let prev = std::mem::replace(&mut self.state, PageWriteGuardState::Downgraded);
|
||||
match prev {
|
||||
PageWriteGuardState::Invalid { inner, _permit } => {
|
||||
assert!(inner.key.is_some());
|
||||
PageReadGuard {
|
||||
_permit: Arc::new(_permit),
|
||||
slot_guard: inner.downgrade(),
|
||||
}
|
||||
}
|
||||
PageWriteGuardState::Downgraded => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -339,11 +352,14 @@ impl Drop for PageWriteGuard<'_> {
|
||||
/// initializing it, remove the mapping from the page cache.
|
||||
///
|
||||
fn drop(&mut self) {
|
||||
assert!(self.inner.key.is_some());
|
||||
if !self.valid {
|
||||
let self_key = self.inner.key.as_ref().unwrap();
|
||||
PAGE_CACHE.get().unwrap().remove_mapping(self_key);
|
||||
self.inner.key = None;
|
||||
match &mut self.state {
|
||||
PageWriteGuardState::Invalid { inner, _permit } => {
|
||||
assert!(inner.key.is_some());
|
||||
let self_key = inner.key.as_ref().unwrap();
|
||||
PAGE_CACHE.get().unwrap().remove_mapping(self_key);
|
||||
inner.key = None;
|
||||
}
|
||||
PageWriteGuardState::Downgraded => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -354,12 +370,6 @@ pub enum ReadBufResult<'a> {
|
||||
NotFound(PageWriteGuard<'a>),
|
||||
}
|
||||
|
||||
/// lock_for_write() return value
|
||||
pub enum WriteBufResult<'a> {
|
||||
Found(PageWriteGuard<'a>),
|
||||
NotFound(PageWriteGuard<'a>),
|
||||
}
|
||||
|
||||
impl PageCache {
|
||||
//
|
||||
// Section 1.1: Public interface functions for looking up and memorizing materialized page
|
||||
@@ -446,20 +456,77 @@ impl PageCache {
|
||||
lsn,
|
||||
};
|
||||
|
||||
match self.lock_for_write(&cache_key).await? {
|
||||
WriteBufResult::Found(write_guard) => {
|
||||
// We already had it in cache. Another thread must've put it there
|
||||
// concurrently. Check that it had the same contents that we
|
||||
// replayed.
|
||||
assert!(*write_guard == img);
|
||||
let mut permit = Some(self.try_get_pinned_slot_permit().await?);
|
||||
loop {
|
||||
// First check if the key already exists in the cache.
|
||||
if let Some(slot_idx) = self.search_mapping_exact(&cache_key) {
|
||||
// The page was found in the mapping. Lock the slot, and re-check
|
||||
// that it's still what we expected (because we don't released the mapping
|
||||
// lock already, another thread could have evicted the page)
|
||||
let slot = &self.slots[slot_idx];
|
||||
let inner = slot.inner.write().await;
|
||||
if inner.key.as_ref() == Some(&cache_key) {
|
||||
slot.inc_usage_count();
|
||||
debug_assert!(
|
||||
{
|
||||
let guard = inner.permit.lock().unwrap();
|
||||
guard.upgrade().is_none()
|
||||
},
|
||||
"we hold a write lock, so, no one else should have a permit"
|
||||
);
|
||||
debug_assert_eq!(inner.buf.len(), img.len());
|
||||
// We already had it in cache. Another thread must've put it there
|
||||
// concurrently. Check that it had the same contents that we
|
||||
// replayed.
|
||||
assert!(inner.buf == img);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
WriteBufResult::NotFound(mut write_guard) => {
|
||||
write_guard.copy_from_slice(img);
|
||||
write_guard.mark_valid();
|
||||
}
|
||||
}
|
||||
debug_assert!(permit.is_some());
|
||||
|
||||
Ok(())
|
||||
// Not found. Find a victim buffer
|
||||
let (slot_idx, mut inner) = self
|
||||
.find_victim(permit.as_ref().unwrap())
|
||||
.await
|
||||
.context("Failed to find evict victim")?;
|
||||
|
||||
// Insert mapping for this. At this point, we may find that another
|
||||
// thread did the same thing concurrently. In that case, we evicted
|
||||
// our victim buffer unnecessarily. Put it into the free list and
|
||||
// continue with the slot that the other thread chose.
|
||||
if let Some(_existing_slot_idx) = self.try_insert_mapping(&cache_key, slot_idx) {
|
||||
// TODO: put to free list
|
||||
|
||||
// We now just loop back to start from beginning. This is not
|
||||
// optimal, we'll perform the lookup in the mapping again, which
|
||||
// is not really necessary because we already got
|
||||
// 'existing_slot_idx'. But this shouldn't happen often enough
|
||||
// to matter much.
|
||||
continue;
|
||||
}
|
||||
|
||||
// Make the slot ready
|
||||
let slot = &self.slots[slot_idx];
|
||||
inner.key = Some(cache_key.clone());
|
||||
slot.set_usage_count(1);
|
||||
// Create a write guard for the slot so we go through the expected motions.
|
||||
debug_assert!(
|
||||
{
|
||||
let guard = inner.permit.lock().unwrap();
|
||||
guard.upgrade().is_none()
|
||||
},
|
||||
"we hold a write lock, so, no one else should have a permit"
|
||||
);
|
||||
let mut write_guard = PageWriteGuard {
|
||||
state: PageWriteGuardState::Invalid {
|
||||
_permit: permit.take().unwrap(),
|
||||
inner,
|
||||
},
|
||||
};
|
||||
write_guard.copy_from_slice(img);
|
||||
let _ = write_guard.mark_valid();
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
// Section 1.2: Public interface functions for working with immutable file pages.
|
||||
@@ -638,99 +705,10 @@ impl PageCache {
|
||||
);
|
||||
|
||||
return Ok(ReadBufResult::NotFound(PageWriteGuard {
|
||||
_permit: permit.take().unwrap(),
|
||||
inner,
|
||||
valid: false,
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
/// Look up a page in the cache and lock it in write mode. If it's not
|
||||
/// found, returns None.
|
||||
///
|
||||
/// When locking a page for writing, the search criteria is always "exact".
|
||||
async fn try_lock_for_write(
|
||||
&self,
|
||||
cache_key: &CacheKey,
|
||||
permit: &mut Option<PinnedSlotsPermit>,
|
||||
) -> Option<PageWriteGuard> {
|
||||
if let Some(slot_idx) = self.search_mapping_for_write(cache_key) {
|
||||
// The page was found in the mapping. Lock the slot, and re-check
|
||||
// that it's still what we expected (because we don't released the mapping
|
||||
// lock already, another thread could have evicted the page)
|
||||
let slot = &self.slots[slot_idx];
|
||||
let inner = slot.inner.write().await;
|
||||
if inner.key.as_ref() == Some(cache_key) {
|
||||
slot.inc_usage_count();
|
||||
debug_assert!(
|
||||
{
|
||||
let guard = inner.permit.lock().unwrap();
|
||||
guard.upgrade().is_none()
|
||||
},
|
||||
"we hold a write lock, so, no one else should have a permit"
|
||||
);
|
||||
return Some(PageWriteGuard {
|
||||
state: PageWriteGuardState::Invalid {
|
||||
_permit: permit.take().unwrap(),
|
||||
inner,
|
||||
valid: true,
|
||||
});
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// Return a write-locked buffer for given block.
|
||||
///
|
||||
/// Similar to lock_for_read(), but the returned buffer is write-locked and
|
||||
/// may be modified by the caller even if it's already found in the cache.
|
||||
async fn lock_for_write(&self, cache_key: &CacheKey) -> anyhow::Result<WriteBufResult> {
|
||||
let mut permit = Some(self.try_get_pinned_slot_permit().await?);
|
||||
loop {
|
||||
// First check if the key already exists in the cache.
|
||||
if let Some(write_guard) = self.try_lock_for_write(cache_key, &mut permit).await {
|
||||
debug_assert!(permit.is_none());
|
||||
return Ok(WriteBufResult::Found(write_guard));
|
||||
}
|
||||
debug_assert!(permit.is_some());
|
||||
|
||||
// Not found. Find a victim buffer
|
||||
let (slot_idx, mut inner) = self
|
||||
.find_victim(permit.as_ref().unwrap())
|
||||
.await
|
||||
.context("Failed to find evict victim")?;
|
||||
|
||||
// Insert mapping for this. At this point, we may find that another
|
||||
// thread did the same thing concurrently. In that case, we evicted
|
||||
// our victim buffer unnecessarily. Put it into the free list and
|
||||
// continue with the slot that the other thread chose.
|
||||
if let Some(_existing_slot_idx) = self.try_insert_mapping(cache_key, slot_idx) {
|
||||
// TODO: put to free list
|
||||
|
||||
// We now just loop back to start from beginning. This is not
|
||||
// optimal, we'll perform the lookup in the mapping again, which
|
||||
// is not really necessary because we already got
|
||||
// 'existing_slot_idx'. But this shouldn't happen often enough
|
||||
// to matter much.
|
||||
continue;
|
||||
}
|
||||
|
||||
// Make the slot ready
|
||||
let slot = &self.slots[slot_idx];
|
||||
inner.key = Some(cache_key.clone());
|
||||
slot.set_usage_count(1);
|
||||
|
||||
debug_assert!(
|
||||
{
|
||||
let guard = inner.permit.lock().unwrap();
|
||||
guard.upgrade().is_none()
|
||||
},
|
||||
"we hold a write lock, so, no one else should have a permit"
|
||||
);
|
||||
|
||||
return Ok(WriteBufResult::NotFound(PageWriteGuard {
|
||||
_permit: permit.take().unwrap(),
|
||||
inner,
|
||||
valid: false,
|
||||
}));
|
||||
}
|
||||
}
|
||||
@@ -775,7 +753,7 @@ impl PageCache {
|
||||
///
|
||||
/// Like 'search_mapping, but performs an "exact" search. Used for
|
||||
/// allocating a new buffer.
|
||||
fn search_mapping_for_write(&self, key: &CacheKey) -> Option<usize> {
|
||||
fn search_mapping_exact(&self, key: &CacheKey) -> Option<usize> {
|
||||
match key {
|
||||
CacheKey::MaterializedPage { hash_key, lsn } => {
|
||||
let map = self.materialized_page_map.read().unwrap();
|
||||
|
||||
@@ -35,6 +35,7 @@ use std::time::Duration;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio_util::io::StreamReader;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::field;
|
||||
use tracing::*;
|
||||
use utils::id::ConnectionId;
|
||||
@@ -64,69 +65,6 @@ use crate::trace::Tracer;
|
||||
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
|
||||
use postgres_ffi::BLCKSZ;
|
||||
|
||||
fn copyin_stream<IO>(pgb: &mut PostgresBackend<IO>) -> impl Stream<Item = io::Result<Bytes>> + '_
|
||||
where
|
||||
IO: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
async_stream::try_stream! {
|
||||
loop {
|
||||
let msg = tokio::select! {
|
||||
biased;
|
||||
|
||||
_ = task_mgr::shutdown_watcher() => {
|
||||
// We were requested to shut down.
|
||||
let msg = "pageserver is shutting down";
|
||||
let _ = pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, None));
|
||||
Err(QueryError::Other(anyhow::anyhow!(msg)))
|
||||
}
|
||||
|
||||
msg = pgb.read_message() => { msg.map_err(QueryError::from)}
|
||||
};
|
||||
|
||||
match msg {
|
||||
Ok(Some(message)) => {
|
||||
let copy_data_bytes = match message {
|
||||
FeMessage::CopyData(bytes) => bytes,
|
||||
FeMessage::CopyDone => { break },
|
||||
FeMessage::Sync => continue,
|
||||
FeMessage::Terminate => {
|
||||
let msg = "client terminated connection with Terminate message during COPY";
|
||||
let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
|
||||
// error can't happen here, ErrorResponse serialization should be always ok
|
||||
pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
|
||||
Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
|
||||
break;
|
||||
}
|
||||
m => {
|
||||
let msg = format!("unexpected message {m:?}");
|
||||
// error can't happen here, ErrorResponse serialization should be always ok
|
||||
pgb.write_message_noflush(&BeMessage::ErrorResponse(&msg, None)).map_err(|e| e.into_io_error())?;
|
||||
Err(io::Error::new(io::ErrorKind::Other, msg))?;
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
yield copy_data_bytes;
|
||||
}
|
||||
Ok(None) => {
|
||||
let msg = "client closed connection during COPY";
|
||||
let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
|
||||
// error can't happen here, ErrorResponse serialization should be always ok
|
||||
pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
|
||||
pgb.flush().await?;
|
||||
Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
|
||||
}
|
||||
Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
|
||||
Err(io_error)?;
|
||||
}
|
||||
Err(other) => {
|
||||
Err(io::Error::new(io::ErrorKind::Other, other.to_string()))?;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Read the end of a tar archive.
|
||||
///
|
||||
/// A tar archive normally ends with two consecutive blocks of zeros, 512 bytes each.
|
||||
@@ -284,7 +222,13 @@ async fn page_service_conn_main(
|
||||
// and create a child per-query context when it invokes process_query.
|
||||
// But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
|
||||
// and create the per-query context in process_query ourselves.
|
||||
let mut conn_handler = PageServerHandler::new(conf, broker_client, auth, connection_ctx);
|
||||
let mut conn_handler = PageServerHandler::new(
|
||||
conf,
|
||||
broker_client,
|
||||
auth,
|
||||
connection_ctx,
|
||||
task_mgr::shutdown_token(),
|
||||
);
|
||||
let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
|
||||
|
||||
match pgbackend
|
||||
@@ -318,6 +262,10 @@ struct PageServerHandler {
|
||||
/// For each query received over the connection,
|
||||
/// `process_query` creates a child context from this one.
|
||||
connection_ctx: RequestContext,
|
||||
|
||||
/// A token that should fire when the tenant transitions from
|
||||
/// attached state, or when the pageserver is shutting down.
|
||||
cancel: CancellationToken,
|
||||
}
|
||||
|
||||
impl PageServerHandler {
|
||||
@@ -326,6 +274,7 @@ impl PageServerHandler {
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
auth: Option<Arc<JwtAuth>>,
|
||||
connection_ctx: RequestContext,
|
||||
cancel: CancellationToken,
|
||||
) -> Self {
|
||||
PageServerHandler {
|
||||
_conf: conf,
|
||||
@@ -333,6 +282,91 @@ impl PageServerHandler {
|
||||
auth,
|
||||
claims: None,
|
||||
connection_ctx,
|
||||
cancel,
|
||||
}
|
||||
}
|
||||
|
||||
/// Wrap PostgresBackend::flush to respect our CancellationToken: it is important to use
|
||||
/// this rather than naked flush() in order to shut down promptly. Without this, we would
|
||||
/// block shutdown of a tenant if a postgres client was failing to consume bytes we send
|
||||
/// in the flush.
|
||||
async fn flush_cancellable<IO>(&self, pgb: &mut PostgresBackend<IO>) -> Result<(), QueryError>
|
||||
where
|
||||
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
|
||||
{
|
||||
tokio::select!(
|
||||
flush_r = pgb.flush() => {
|
||||
Ok(flush_r?)
|
||||
},
|
||||
_ = self.cancel.cancelled() => {
|
||||
Err(QueryError::Other(anyhow::anyhow!("Shutting down")))
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
fn copyin_stream<'a, IO>(
|
||||
&'a self,
|
||||
pgb: &'a mut PostgresBackend<IO>,
|
||||
) -> impl Stream<Item = io::Result<Bytes>> + 'a
|
||||
where
|
||||
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
|
||||
{
|
||||
async_stream::try_stream! {
|
||||
loop {
|
||||
let msg = tokio::select! {
|
||||
biased;
|
||||
|
||||
_ = task_mgr::shutdown_watcher() => {
|
||||
// We were requested to shut down.
|
||||
let msg = "pageserver is shutting down";
|
||||
let _ = pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, None));
|
||||
Err(QueryError::Other(anyhow::anyhow!(msg)))
|
||||
}
|
||||
|
||||
msg = pgb.read_message() => { msg.map_err(QueryError::from)}
|
||||
};
|
||||
|
||||
match msg {
|
||||
Ok(Some(message)) => {
|
||||
let copy_data_bytes = match message {
|
||||
FeMessage::CopyData(bytes) => bytes,
|
||||
FeMessage::CopyDone => { break },
|
||||
FeMessage::Sync => continue,
|
||||
FeMessage::Terminate => {
|
||||
let msg = "client terminated connection with Terminate message during COPY";
|
||||
let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
|
||||
// error can't happen here, ErrorResponse serialization should be always ok
|
||||
pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
|
||||
Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
|
||||
break;
|
||||
}
|
||||
m => {
|
||||
let msg = format!("unexpected message {m:?}");
|
||||
// error can't happen here, ErrorResponse serialization should be always ok
|
||||
pgb.write_message_noflush(&BeMessage::ErrorResponse(&msg, None)).map_err(|e| e.into_io_error())?;
|
||||
Err(io::Error::new(io::ErrorKind::Other, msg))?;
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
yield copy_data_bytes;
|
||||
}
|
||||
Ok(None) => {
|
||||
let msg = "client closed connection during COPY";
|
||||
let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
|
||||
// error can't happen here, ErrorResponse serialization should be always ok
|
||||
pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
|
||||
self.flush_cancellable(pgb).await.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
|
||||
Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
|
||||
}
|
||||
Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
|
||||
Err(io_error)?;
|
||||
}
|
||||
Err(other) => {
|
||||
Err(io::Error::new(io::ErrorKind::Other, other.to_string()))?;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -372,7 +406,7 @@ impl PageServerHandler {
|
||||
|
||||
// switch client to COPYBOTH
|
||||
pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
|
||||
pgb.flush().await?;
|
||||
self.flush_cancellable(pgb).await?;
|
||||
|
||||
let metrics = metrics::SmgrQueryTimePerTimeline::new(&tenant_id, &timeline_id);
|
||||
|
||||
@@ -465,7 +499,7 @@ impl PageServerHandler {
|
||||
});
|
||||
|
||||
pgb.write_message_noflush(&BeMessage::CopyData(&response.serialize()))?;
|
||||
pgb.flush().await?;
|
||||
self.flush_cancellable(pgb).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -508,9 +542,9 @@ impl PageServerHandler {
|
||||
// Import basebackup provided via CopyData
|
||||
info!("importing basebackup");
|
||||
pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
|
||||
pgb.flush().await?;
|
||||
self.flush_cancellable(pgb).await?;
|
||||
|
||||
let mut copyin_reader = pin!(StreamReader::new(copyin_stream(pgb)));
|
||||
let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb)));
|
||||
timeline
|
||||
.import_basebackup_from_tar(
|
||||
&mut copyin_reader,
|
||||
@@ -563,8 +597,8 @@ impl PageServerHandler {
|
||||
// Import wal provided via CopyData
|
||||
info!("importing wal");
|
||||
pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
|
||||
pgb.flush().await?;
|
||||
let mut copyin_reader = pin!(StreamReader::new(copyin_stream(pgb)));
|
||||
self.flush_cancellable(pgb).await?;
|
||||
let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb)));
|
||||
import_wal_from_tar(&timeline, &mut copyin_reader, start_lsn, end_lsn, &ctx).await?;
|
||||
info!("wal import complete");
|
||||
|
||||
@@ -772,7 +806,7 @@ impl PageServerHandler {
|
||||
|
||||
// switch client to COPYOUT
|
||||
pgb.write_message_noflush(&BeMessage::CopyOutResponse)?;
|
||||
pgb.flush().await?;
|
||||
self.flush_cancellable(pgb).await?;
|
||||
|
||||
// Send a tarball of the latest layer on the timeline. Compress if not
|
||||
// fullbackup. TODO Compress in that case too (tests need to be updated)
|
||||
@@ -824,7 +858,7 @@ impl PageServerHandler {
|
||||
}
|
||||
|
||||
pgb.write_message_noflush(&BeMessage::CopyDone)?;
|
||||
pgb.flush().await?;
|
||||
self.flush_cancellable(pgb).await?;
|
||||
|
||||
let basebackup_after = started
|
||||
.elapsed()
|
||||
|
||||
@@ -406,6 +406,8 @@ pub enum CreateTimelineError {
|
||||
AlreadyExists,
|
||||
#[error(transparent)]
|
||||
AncestorLsn(anyhow::Error),
|
||||
#[error("ancestor timeline is not active")]
|
||||
AncestorNotActive,
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
@@ -1587,6 +1589,12 @@ impl Tenant {
|
||||
.get_timeline(ancestor_timeline_id, false)
|
||||
.context("Cannot branch off the timeline that's not present in pageserver")?;
|
||||
|
||||
// instead of waiting around, just deny the request because ancestor is not yet
|
||||
// ready for other purposes either.
|
||||
if !ancestor_timeline.is_active() {
|
||||
return Err(CreateTimelineError::AncestorNotActive);
|
||||
}
|
||||
|
||||
if let Some(lsn) = ancestor_start_lsn.as_mut() {
|
||||
*lsn = lsn.align();
|
||||
|
||||
@@ -1619,8 +1627,6 @@ impl Tenant {
|
||||
}
|
||||
};
|
||||
|
||||
loaded_timeline.activate(broker_client, None, ctx);
|
||||
|
||||
if let Some(remote_client) = loaded_timeline.remote_client.as_ref() {
|
||||
// Wait for the upload of the 'index_part.json` file to finish, so that when we return
|
||||
// Ok, the timeline is durable in remote storage.
|
||||
@@ -1632,6 +1638,8 @@ impl Tenant {
|
||||
})?;
|
||||
}
|
||||
|
||||
loaded_timeline.activate(broker_client, None, ctx);
|
||||
|
||||
Ok(loaded_timeline)
|
||||
}
|
||||
|
||||
|
||||
@@ -186,26 +186,21 @@ impl FileBlockReader {
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BlockLease, std::io::Error> {
|
||||
let cache = page_cache::get();
|
||||
loop {
|
||||
match cache
|
||||
.read_immutable_buf(self.file_id, blknum, ctx)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
format!("Failed to read immutable buf: {e:#}"),
|
||||
)
|
||||
})? {
|
||||
ReadBufResult::Found(guard) => break Ok(guard.into()),
|
||||
ReadBufResult::NotFound(mut write_guard) => {
|
||||
// Read the page from disk into the buffer
|
||||
self.fill_buffer(write_guard.deref_mut(), blknum).await?;
|
||||
write_guard.mark_valid();
|
||||
|
||||
// Swap for read lock
|
||||
continue;
|
||||
}
|
||||
};
|
||||
match cache
|
||||
.read_immutable_buf(self.file_id, blknum, ctx)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
format!("Failed to read immutable buf: {e:#}"),
|
||||
)
|
||||
})? {
|
||||
ReadBufResult::Found(guard) => Ok(guard.into()),
|
||||
ReadBufResult::NotFound(mut write_guard) => {
|
||||
// Read the page from disk into the buffer
|
||||
self.fill_buffer(write_guard.deref_mut(), blknum).await?;
|
||||
Ok(write_guard.mark_valid().into())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -72,36 +72,32 @@ impl EphemeralFile {
|
||||
let flushed_blknums = 0..self.len / PAGE_SZ as u64;
|
||||
if flushed_blknums.contains(&(blknum as u64)) {
|
||||
let cache = page_cache::get();
|
||||
loop {
|
||||
match cache
|
||||
.read_immutable_buf(self.page_cache_file_id, blknum, ctx)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
// order path before error because error is anyhow::Error => might have many contexts
|
||||
format!(
|
||||
"ephemeral file: read immutable page #{}: {}: {:#}",
|
||||
blknum, self.file.path, e,
|
||||
),
|
||||
)
|
||||
})? {
|
||||
page_cache::ReadBufResult::Found(guard) => {
|
||||
return Ok(BlockLease::PageReadGuard(guard))
|
||||
}
|
||||
page_cache::ReadBufResult::NotFound(mut write_guard) => {
|
||||
let buf: &mut [u8] = write_guard.deref_mut();
|
||||
debug_assert_eq!(buf.len(), PAGE_SZ);
|
||||
self.file
|
||||
.read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)
|
||||
.await?;
|
||||
write_guard.mark_valid();
|
||||
|
||||
// Swap for read lock
|
||||
continue;
|
||||
}
|
||||
};
|
||||
}
|
||||
match cache
|
||||
.read_immutable_buf(self.page_cache_file_id, blknum, ctx)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
// order path before error because error is anyhow::Error => might have many contexts
|
||||
format!(
|
||||
"ephemeral file: read immutable page #{}: {}: {:#}",
|
||||
blknum, self.file.path, e,
|
||||
),
|
||||
)
|
||||
})? {
|
||||
page_cache::ReadBufResult::Found(guard) => {
|
||||
return Ok(BlockLease::PageReadGuard(guard))
|
||||
}
|
||||
page_cache::ReadBufResult::NotFound(mut write_guard) => {
|
||||
let buf: &mut [u8] = write_guard.deref_mut();
|
||||
debug_assert_eq!(buf.len(), PAGE_SZ);
|
||||
self.file
|
||||
.read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)
|
||||
.await?;
|
||||
let read_guard = write_guard.mark_valid();
|
||||
return Ok(BlockLease::PageReadGuard(read_guard));
|
||||
}
|
||||
};
|
||||
} else {
|
||||
debug_assert_eq!(blknum as u64, self.len / PAGE_SZ as u64);
|
||||
Ok(BlockLease::EphemeralFileMutableTail(&self.mutable_tail))
|
||||
@@ -171,7 +167,7 @@ impl EphemeralFile {
|
||||
let buf: &mut [u8] = write_guard.deref_mut();
|
||||
debug_assert_eq!(buf.len(), PAGE_SZ);
|
||||
buf.copy_from_slice(&self.ephemeral_file.mutable_tail);
|
||||
write_guard.mark_valid();
|
||||
let _ = write_guard.mark_valid();
|
||||
// pre-warm successful
|
||||
}
|
||||
Err(e) => {
|
||||
|
||||
@@ -151,61 +151,86 @@ async fn safe_rename_tenant_dir(path: impl AsRef<Utf8Path>) -> std::io::Result<U
|
||||
|
||||
static TENANTS: Lazy<RwLock<TenantsMap>> = Lazy::new(|| RwLock::new(TenantsMap::Initializing));
|
||||
|
||||
/// Initialize repositories with locally available timelines.
|
||||
/// Timelines that are only partially available locally (remote storage has more data than this pageserver)
|
||||
/// are scheduled for download and added to the tenant once download is completed.
|
||||
#[instrument(skip_all)]
|
||||
pub async fn init_tenant_mgr(
|
||||
fn emergency_generations(
|
||||
tenant_confs: &HashMap<TenantId, anyhow::Result<LocationConf>>,
|
||||
) -> HashMap<TenantId, Generation> {
|
||||
tenant_confs
|
||||
.iter()
|
||||
.filter_map(|(tid, lc)| {
|
||||
let lc = match lc {
|
||||
Ok(lc) => lc,
|
||||
Err(_) => return None,
|
||||
};
|
||||
let gen = match &lc.mode {
|
||||
LocationMode::Attached(alc) => Some(alc.generation),
|
||||
LocationMode::Secondary(_) => None,
|
||||
};
|
||||
|
||||
gen.map(|g| (*tid, g))
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
async fn init_load_generations(
|
||||
conf: &'static PageServerConf,
|
||||
resources: TenantSharedResources,
|
||||
init_order: InitializationOrder,
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
// Scan local filesystem for attached tenants
|
||||
let tenants_dir = conf.tenants_path();
|
||||
|
||||
let mut tenants = HashMap::new();
|
||||
|
||||
// If we are configured to use the control plane API, then it is the source of truth for what tenants to load.
|
||||
let tenant_generations = if let Some(client) = ControlPlaneClient::new(conf, &cancel) {
|
||||
let result = match client.re_attach().await {
|
||||
tenant_confs: &HashMap<TenantId, anyhow::Result<LocationConf>>,
|
||||
resources: &TenantSharedResources,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<Option<HashMap<TenantId, Generation>>> {
|
||||
let generations = if conf.control_plane_emergency_mode {
|
||||
error!(
|
||||
"Emergency mode! Tenants will be attached unsafely using their last known generation"
|
||||
);
|
||||
emergency_generations(tenant_confs)
|
||||
} else if let Some(client) = ControlPlaneClient::new(conf, cancel) {
|
||||
info!("Calling control plane API to re-attach tenants");
|
||||
// If we are configured to use the control plane API, then it is the source of truth for what tenants to load.
|
||||
match client.re_attach().await {
|
||||
Ok(tenants) => tenants,
|
||||
Err(RetryForeverError::ShuttingDown) => {
|
||||
anyhow::bail!("Shut down while waiting for control plane re-attach response")
|
||||
}
|
||||
};
|
||||
|
||||
// The deletion queue needs to know about the startup attachment state to decide which (if any) stored
|
||||
// deletion list entries may still be valid. We provide that by pushing a recovery operation into
|
||||
// the queue. Sequential processing of te queue ensures that recovery is done before any new tenant deletions
|
||||
// are processed, even though we don't block on recovery completing here.
|
||||
//
|
||||
// Must only do this if remote storage is enabled, otherwise deletion queue
|
||||
// is not running and channel push will fail.
|
||||
if resources.remote_storage.is_some() {
|
||||
resources
|
||||
.deletion_queue_client
|
||||
.recover(result.clone())
|
||||
.await?;
|
||||
}
|
||||
|
||||
Some(result)
|
||||
} else {
|
||||
info!("Control plane API not configured, tenant generations are disabled");
|
||||
None
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
// The deletion queue needs to know about the startup attachment state to decide which (if any) stored
|
||||
// deletion list entries may still be valid. We provide that by pushing a recovery operation into
|
||||
// the queue. Sequential processing of te queue ensures that recovery is done before any new tenant deletions
|
||||
// are processed, even though we don't block on recovery completing here.
|
||||
//
|
||||
// Must only do this if remote storage is enabled, otherwise deletion queue
|
||||
// is not running and channel push will fail.
|
||||
if resources.remote_storage.is_some() {
|
||||
resources
|
||||
.deletion_queue_client
|
||||
.recover(generations.clone())
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(Some(generations))
|
||||
}
|
||||
|
||||
/// Initial stage of load: walk the local tenants directory, clean up any temp files,
|
||||
/// and load configurations for the tenants we found.
|
||||
async fn init_load_tenant_configs(
|
||||
conf: &'static PageServerConf,
|
||||
) -> anyhow::Result<HashMap<TenantId, anyhow::Result<LocationConf>>> {
|
||||
let tenants_dir = conf.tenants_path();
|
||||
|
||||
let mut dir_entries = tenants_dir
|
||||
.read_dir_utf8()
|
||||
.with_context(|| format!("Failed to list tenants dir {tenants_dir:?}"))?;
|
||||
|
||||
let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Warn);
|
||||
let mut configs = HashMap::new();
|
||||
|
||||
loop {
|
||||
match dir_entries.next() {
|
||||
None => break,
|
||||
Some(Ok(dir_entry)) => {
|
||||
let tenant_dir_path = dir_entry.path().to_path_buf();
|
||||
Some(Ok(dentry)) => {
|
||||
let tenant_dir_path = dentry.path().to_path_buf();
|
||||
if crate::is_temporary(&tenant_dir_path) {
|
||||
info!("Found temporary tenant directory, removing: {tenant_dir_path}");
|
||||
// No need to use safe_remove_tenant_dir_all because this is already
|
||||
@@ -216,141 +241,158 @@ pub async fn init_tenant_mgr(
|
||||
tenant_dir_path, e
|
||||
);
|
||||
}
|
||||
} else {
|
||||
// This case happens if we:
|
||||
// * crash during attach before creating the attach marker file
|
||||
// * crash during tenant delete before removing tenant directory
|
||||
let is_empty = tenant_dir_path.is_empty_dir().with_context(|| {
|
||||
format!("Failed to check whether {tenant_dir_path:?} is an empty dir")
|
||||
})?;
|
||||
if is_empty {
|
||||
info!("removing empty tenant directory {tenant_dir_path:?}");
|
||||
if let Err(e) = fs::remove_dir(&tenant_dir_path).await {
|
||||
error!(
|
||||
"Failed to remove empty tenant directory '{}': {e:#}",
|
||||
tenant_dir_path
|
||||
)
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
let tenant_ignore_mark_file = tenant_dir_path.join(IGNORED_TENANT_FILE_NAME);
|
||||
if tenant_ignore_mark_file.exists() {
|
||||
info!("Found an ignore mark file {tenant_ignore_mark_file:?}, skipping the tenant");
|
||||
continue;
|
||||
}
|
||||
|
||||
let tenant_id = match tenant_dir_path
|
||||
.file_name()
|
||||
.unwrap_or_default()
|
||||
.parse::<TenantId>()
|
||||
{
|
||||
Ok(id) => id,
|
||||
Err(_) => {
|
||||
warn!(
|
||||
"Invalid tenant path (garbage in our repo directory?): {}",
|
||||
tenant_dir_path
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// Try loading the location configuration
|
||||
let mut location_conf = match Tenant::load_tenant_config(conf, &tenant_id)
|
||||
.context("load tenant config")
|
||||
{
|
||||
Ok(c) => c,
|
||||
Err(e) => {
|
||||
warn!("Marking tenant broken, failed to {e:#}");
|
||||
|
||||
tenants.insert(
|
||||
tenant_id,
|
||||
TenantSlot::Attached(Tenant::create_broken_tenant(
|
||||
conf,
|
||||
tenant_id,
|
||||
"error loading tenant location configuration".to_string(),
|
||||
)),
|
||||
);
|
||||
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let generation = if let Some(generations) = &tenant_generations {
|
||||
// We have a generation map: treat it as the authority for whether
|
||||
// this tenant is really attached.
|
||||
if let Some(gen) = generations.get(&tenant_id) {
|
||||
*gen
|
||||
} else {
|
||||
match &location_conf.mode {
|
||||
LocationMode::Secondary(_) => {
|
||||
// We do not require the control plane's permission for secondary mode
|
||||
// tenants, because they do no remote writes and hence require no
|
||||
// generation number
|
||||
info!("Loaded tenant {tenant_id} in secondary mode");
|
||||
tenants.insert(tenant_id, TenantSlot::Secondary);
|
||||
}
|
||||
LocationMode::Attached(_) => {
|
||||
// TODO: augment re-attach API to enable the control plane to
|
||||
// instruct us about secondary attachments. That way, instead of throwing
|
||||
// away local state, we can gracefully fall back to secondary here, if the control
|
||||
// plane tells us so.
|
||||
// (https://github.com/neondatabase/neon/issues/5377)
|
||||
info!("Detaching tenant {tenant_id}, control plane omitted it in re-attach response");
|
||||
if let Err(e) =
|
||||
safe_remove_tenant_dir_all(&tenant_dir_path).await
|
||||
{
|
||||
error!(
|
||||
"Failed to remove detached tenant directory '{}': {:?}",
|
||||
tenant_dir_path, e
|
||||
);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
// Legacy mode: no generation information, any tenant present
|
||||
// on local disk may activate
|
||||
info!(
|
||||
"Starting tenant {} in legacy mode, no generation",
|
||||
tenant_dir_path
|
||||
);
|
||||
Generation::none()
|
||||
};
|
||||
|
||||
// Presence of a generation number implies attachment: attach the tenant
|
||||
// if it wasn't already, and apply the generation number.
|
||||
location_conf.attach_in_generation(generation);
|
||||
Tenant::persist_tenant_config(conf, &tenant_id, &location_conf).await?;
|
||||
|
||||
match schedule_local_tenant_processing(
|
||||
conf,
|
||||
tenant_id,
|
||||
&tenant_dir_path,
|
||||
AttachedTenantConf::try_from(location_conf)?,
|
||||
resources.clone(),
|
||||
Some(init_order.clone()),
|
||||
&TENANTS,
|
||||
&ctx,
|
||||
) {
|
||||
Ok(tenant) => {
|
||||
tenants.insert(tenant.tenant_id(), TenantSlot::Attached(tenant));
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to collect tenant files from dir {tenants_dir:?} for entry {dir_entry:?}, reason: {e:#}");
|
||||
}
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
// This case happens if we:
|
||||
// * crash during attach before creating the attach marker file
|
||||
// * crash during tenant delete before removing tenant directory
|
||||
let is_empty = tenant_dir_path.is_empty_dir().with_context(|| {
|
||||
format!("Failed to check whether {tenant_dir_path:?} is an empty dir")
|
||||
})?;
|
||||
if is_empty {
|
||||
info!("removing empty tenant directory {tenant_dir_path:?}");
|
||||
if let Err(e) = fs::remove_dir(&tenant_dir_path).await {
|
||||
error!(
|
||||
"Failed to remove empty tenant directory '{}': {e:#}",
|
||||
tenant_dir_path
|
||||
)
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
let tenant_ignore_mark_file = tenant_dir_path.join(IGNORED_TENANT_FILE_NAME);
|
||||
if tenant_ignore_mark_file.exists() {
|
||||
info!("Found an ignore mark file {tenant_ignore_mark_file:?}, skipping the tenant");
|
||||
continue;
|
||||
}
|
||||
|
||||
let tenant_id = match tenant_dir_path
|
||||
.file_name()
|
||||
.unwrap_or_default()
|
||||
.parse::<TenantId>()
|
||||
{
|
||||
Ok(id) => id,
|
||||
Err(_) => {
|
||||
warn!(
|
||||
"Invalid tenant path (garbage in our repo directory?): {tenant_dir_path}",
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
configs.insert(tenant_id, Tenant::load_tenant_config(conf, &tenant_id));
|
||||
}
|
||||
Some(Err(e)) => {
|
||||
// On error, print it, but continue with the other tenants. If we error out
|
||||
// here, the pageserver startup fails altogether, causing outage for *all*
|
||||
// tenants. That seems worse.
|
||||
error!(
|
||||
"Failed to list tenants dir entry in directory {tenants_dir:?}, reason: {e:?}"
|
||||
// An error listing the top level directory indicates serious problem
|
||||
// with local filesystem: we will fail to load, and fail to start.
|
||||
anyhow::bail!(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(configs)
|
||||
}
|
||||
|
||||
/// Initialize repositories with locally available timelines.
|
||||
/// Timelines that are only partially available locally (remote storage has more data than this pageserver)
|
||||
/// are scheduled for download and added to the tenant once download is completed.
|
||||
#[instrument(skip_all)]
|
||||
pub async fn init_tenant_mgr(
|
||||
conf: &'static PageServerConf,
|
||||
resources: TenantSharedResources,
|
||||
init_order: InitializationOrder,
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut tenants = HashMap::new();
|
||||
|
||||
let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Warn);
|
||||
|
||||
// Scan local filesystem for attached tenants
|
||||
let tenant_configs = init_load_tenant_configs(conf).await?;
|
||||
|
||||
// Determine which tenants are to be attached
|
||||
let tenant_generations =
|
||||
init_load_generations(conf, &tenant_configs, &resources, &cancel).await?;
|
||||
|
||||
// Construct `Tenant` objects and start them running
|
||||
for (tenant_id, location_conf) in tenant_configs {
|
||||
let tenant_dir_path = conf.tenant_path(&tenant_id);
|
||||
|
||||
let mut location_conf = match location_conf {
|
||||
Ok(l) => l,
|
||||
Err(e) => {
|
||||
warn!(%tenant_id, "Marking tenant broken, failed to {e:#}");
|
||||
|
||||
tenants.insert(
|
||||
tenant_id,
|
||||
TenantSlot::Attached(Tenant::create_broken_tenant(
|
||||
conf,
|
||||
tenant_id,
|
||||
format!("{}", e),
|
||||
)),
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let generation = if let Some(generations) = &tenant_generations {
|
||||
// We have a generation map: treat it as the authority for whether
|
||||
// this tenant is really attached.
|
||||
if let Some(gen) = generations.get(&tenant_id) {
|
||||
*gen
|
||||
} else {
|
||||
match &location_conf.mode {
|
||||
LocationMode::Secondary(_) => {
|
||||
// We do not require the control plane's permission for secondary mode
|
||||
// tenants, because they do no remote writes and hence require no
|
||||
// generation number
|
||||
info!(%tenant_id, "Loaded tenant in secondary mode");
|
||||
tenants.insert(tenant_id, TenantSlot::Secondary);
|
||||
}
|
||||
LocationMode::Attached(_) => {
|
||||
// TODO: augment re-attach API to enable the control plane to
|
||||
// instruct us about secondary attachments. That way, instead of throwing
|
||||
// away local state, we can gracefully fall back to secondary here, if the control
|
||||
// plane tells us so.
|
||||
// (https://github.com/neondatabase/neon/issues/5377)
|
||||
info!(%tenant_id, "Detaching tenant, control plane omitted it in re-attach response");
|
||||
if let Err(e) = safe_remove_tenant_dir_all(&tenant_dir_path).await {
|
||||
error!(%tenant_id,
|
||||
"Failed to remove detached tenant directory '{tenant_dir_path}': {e:?}",
|
||||
);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
// Legacy mode: no generation information, any tenant present
|
||||
// on local disk may activate
|
||||
info!(%tenant_id, "Starting tenant in legacy mode, no generation",);
|
||||
Generation::none()
|
||||
};
|
||||
|
||||
// Presence of a generation number implies attachment: attach the tenant
|
||||
// if it wasn't already, and apply the generation number.
|
||||
location_conf.attach_in_generation(generation);
|
||||
Tenant::persist_tenant_config(conf, &tenant_id, &location_conf).await?;
|
||||
|
||||
match schedule_local_tenant_processing(
|
||||
conf,
|
||||
tenant_id,
|
||||
&tenant_dir_path,
|
||||
AttachedTenantConf::try_from(location_conf)?,
|
||||
resources.clone(),
|
||||
Some(init_order.clone()),
|
||||
&TENANTS,
|
||||
&ctx,
|
||||
) {
|
||||
Ok(tenant) => {
|
||||
tenants.insert(tenant.tenant_id(), TenantSlot::Attached(tenant));
|
||||
}
|
||||
Err(e) => {
|
||||
error!(%tenant_id, "Failed to start tenant: {e:#}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -901,9 +901,27 @@ impl RemoteTimelineClient {
|
||||
.await
|
||||
.context("list prefixes")?;
|
||||
|
||||
let remaining: Vec<RemotePath> = remaining
|
||||
// We will delete the current index_part object last, since it acts as a deletion
|
||||
// marker via its deleted_at attribute
|
||||
let latest_index = remaining
|
||||
.iter()
|
||||
.filter(|p| {
|
||||
p.object_name()
|
||||
.map(|n| n.starts_with(IndexPart::FILE_NAME))
|
||||
.unwrap_or(false)
|
||||
})
|
||||
.filter_map(|path| parse_remote_index_path(path.clone()).map(|gen| (path, gen)))
|
||||
.max_by_key(|i| i.1)
|
||||
.map(|i| i.0.clone())
|
||||
.unwrap_or(
|
||||
// No generation-suffixed indices, assume we are dealing with
|
||||
// a legacy index.
|
||||
remote_index_path(&self.tenant_id, &self.timeline_id, Generation::none()),
|
||||
);
|
||||
|
||||
let remaining_layers: Vec<RemotePath> = remaining
|
||||
.into_iter()
|
||||
.filter(|p| p.object_name() != Some(IndexPart::FILE_NAME))
|
||||
.filter(|p| p!= &latest_index)
|
||||
.inspect(|path| {
|
||||
if let Some(name) = path.object_name() {
|
||||
info!(%name, "deleting a file not referenced from index_part.json");
|
||||
@@ -913,9 +931,11 @@ impl RemoteTimelineClient {
|
||||
})
|
||||
.collect();
|
||||
|
||||
let not_referenced_count = remaining.len();
|
||||
if !remaining.is_empty() {
|
||||
self.deletion_queue_client.push_immediate(remaining).await?;
|
||||
let not_referenced_count = remaining_layers.len();
|
||||
if !remaining_layers.is_empty() {
|
||||
self.deletion_queue_client
|
||||
.push_immediate(remaining_layers)
|
||||
.await?;
|
||||
}
|
||||
|
||||
fail::fail_point!("timeline-delete-before-index-delete", |_| {
|
||||
@@ -924,11 +944,9 @@ impl RemoteTimelineClient {
|
||||
))?
|
||||
});
|
||||
|
||||
let index_file_path = timeline_storage_path.join(Utf8Path::new(IndexPart::FILE_NAME));
|
||||
|
||||
debug!("enqueuing index part deletion");
|
||||
self.deletion_queue_client
|
||||
.push_immediate([index_file_path].to_vec())
|
||||
.push_immediate([latest_index].to_vec())
|
||||
.await?;
|
||||
|
||||
// Timeline deletion is rare and we have probably emitted a reasonably number of objects: wait
|
||||
|
||||
@@ -31,6 +31,7 @@ pub(super) async fn upload_index_part<'a>(
|
||||
fail_point!("before-upload-index", |_| {
|
||||
bail!("failpoint before-upload-index")
|
||||
});
|
||||
pausable_failpoint!("before-upload-index-pausable");
|
||||
|
||||
let index_part_bytes =
|
||||
serde_json::to_vec(&index_part).context("serialize index part file into bytes")?;
|
||||
|
||||
@@ -511,8 +511,7 @@ impl DeltaLayer {
|
||||
///
|
||||
/// This variant is only used for debugging purposes, by the 'pagectl' binary.
|
||||
pub fn new_for_path(path: &Utf8Path, file: File) -> Result<Self> {
|
||||
let mut summary_buf = Vec::new();
|
||||
summary_buf.resize(PAGE_SZ, 0);
|
||||
let mut summary_buf = vec![0; PAGE_SZ];
|
||||
file.read_exact_at(&mut summary_buf, 0)?;
|
||||
let summary = Summary::des_prefix(&summary_buf)?;
|
||||
|
||||
|
||||
@@ -400,8 +400,7 @@ impl ImageLayer {
|
||||
///
|
||||
/// This variant is only used for debugging purposes, by the 'pagectl' binary.
|
||||
pub fn new_for_path(path: &Utf8Path, file: File) -> Result<ImageLayer> {
|
||||
let mut summary_buf = Vec::new();
|
||||
summary_buf.resize(PAGE_SZ, 0);
|
||||
let mut summary_buf = vec![0; PAGE_SZ];
|
||||
file.read_exact_at(&mut summary_buf, 0)?;
|
||||
let summary = Summary::des_prefix(&summary_buf)?;
|
||||
let metadata = file
|
||||
|
||||
@@ -158,7 +158,7 @@ pub struct Timeline {
|
||||
|
||||
/// The generation of the tenant that instantiated us: this is used for safety when writing remote objects.
|
||||
/// Never changes for the lifetime of this [`Timeline`] object.
|
||||
///
|
||||
///
|
||||
/// This duplicates the generation stored in LocationConf, but that structure is mutable:
|
||||
/// this copy enforces the invariant that generatio doesn't change during a Tenant's lifetime.
|
||||
generation: Generation,
|
||||
@@ -2363,7 +2363,7 @@ impl Timeline {
|
||||
// during branch creation.
|
||||
match ancestor.wait_to_become_active(ctx).await {
|
||||
Ok(()) => {}
|
||||
Err(state) if state == TimelineState::Stopping => {
|
||||
Err(TimelineState::Stopping) => {
|
||||
return Err(PageReconstructError::AncestorStopping(ancestor.timeline_id));
|
||||
}
|
||||
Err(state) => {
|
||||
|
||||
@@ -825,7 +825,7 @@ impl PostgresRedoManager {
|
||||
while nwrite < writebuf.len() {
|
||||
let n = loop {
|
||||
match nix::poll::poll(&mut pollfds[0..2], wal_redo_timeout.as_millis() as i32) {
|
||||
Err(e) if e == nix::errno::Errno::EINTR => continue,
|
||||
Err(nix::errno::Errno::EINTR) => continue,
|
||||
res => break res,
|
||||
}
|
||||
}?;
|
||||
@@ -917,7 +917,7 @@ impl PostgresRedoManager {
|
||||
// and forward any logging information that the child writes to its stderr to the page server's log.
|
||||
let n = loop {
|
||||
match nix::poll::poll(&mut pollfds[1..3], wal_redo_timeout.as_millis() as i32) {
|
||||
Err(e) if e == nix::errno::Errno::EINTR => continue,
|
||||
Err(nix::errno::Errno::EINTR) => continue,
|
||||
res => break res,
|
||||
}
|
||||
}?;
|
||||
|
||||
@@ -7,12 +7,12 @@ OBJS = \
|
||||
extension_server.o \
|
||||
file_cache.o \
|
||||
libpagestore.o \
|
||||
libpqwalproposer.o \
|
||||
neon.o \
|
||||
neon_utils.o \
|
||||
pagestore_smgr.o \
|
||||
relsize_cache.o \
|
||||
walproposer.o \
|
||||
walproposer_utils.o \
|
||||
walproposer_pg.o \
|
||||
control_plane_connector.o
|
||||
|
||||
PG_CPPFLAGS = -I$(libpq_srcdir)
|
||||
|
||||
@@ -30,7 +30,7 @@
|
||||
|
||||
#include "neon.h"
|
||||
#include "walproposer.h"
|
||||
#include "walproposer_utils.h"
|
||||
#include "neon_utils.h"
|
||||
|
||||
#define PageStoreTrace DEBUG5
|
||||
|
||||
|
||||
@@ -1,424 +0,0 @@
|
||||
#include "postgres.h"
|
||||
|
||||
#include "libpq-fe.h"
|
||||
#include "neon.h"
|
||||
#include "walproposer.h"
|
||||
|
||||
/* Header in walproposer.h -- Wrapper struct to abstract away the libpq connection */
|
||||
struct WalProposerConn
|
||||
{
|
||||
PGconn *pg_conn;
|
||||
bool is_nonblocking; /* whether the connection is non-blocking */
|
||||
char *recvbuf; /* last received data from
|
||||
* walprop_async_read */
|
||||
};
|
||||
|
||||
/* Helper function */
|
||||
static bool
|
||||
ensure_nonblocking_status(WalProposerConn *conn, bool is_nonblocking)
|
||||
{
|
||||
/* If we're already correctly blocking or nonblocking, all good */
|
||||
if (is_nonblocking == conn->is_nonblocking)
|
||||
return true;
|
||||
|
||||
/* Otherwise, set it appropriately */
|
||||
if (PQsetnonblocking(conn->pg_conn, is_nonblocking) == -1)
|
||||
return false;
|
||||
|
||||
conn->is_nonblocking = is_nonblocking;
|
||||
return true;
|
||||
}
|
||||
|
||||
/* Exported function definitions */
|
||||
char *
|
||||
walprop_error_message(WalProposerConn *conn)
|
||||
{
|
||||
return PQerrorMessage(conn->pg_conn);
|
||||
}
|
||||
|
||||
WalProposerConnStatusType
|
||||
walprop_status(WalProposerConn *conn)
|
||||
{
|
||||
switch (PQstatus(conn->pg_conn))
|
||||
{
|
||||
case CONNECTION_OK:
|
||||
return WP_CONNECTION_OK;
|
||||
case CONNECTION_BAD:
|
||||
return WP_CONNECTION_BAD;
|
||||
default:
|
||||
return WP_CONNECTION_IN_PROGRESS;
|
||||
}
|
||||
}
|
||||
|
||||
WalProposerConn *
|
||||
walprop_connect_start(char *conninfo, char *password)
|
||||
{
|
||||
WalProposerConn *conn;
|
||||
PGconn *pg_conn;
|
||||
const char *keywords[3];
|
||||
const char *values[3];
|
||||
int n;
|
||||
|
||||
/*
|
||||
* Connect using the given connection string. If the
|
||||
* NEON_AUTH_TOKEN environment variable was set, use that as
|
||||
* the password.
|
||||
*
|
||||
* The connection options are parsed in the order they're given, so
|
||||
* when we set the password before the connection string, the
|
||||
* connection string can override the password from the env variable.
|
||||
* Seems useful, although we don't currently use that capability
|
||||
* anywhere.
|
||||
*/
|
||||
n = 0;
|
||||
if (password)
|
||||
{
|
||||
keywords[n] = "password";
|
||||
values[n] = password;
|
||||
n++;
|
||||
}
|
||||
keywords[n] = "dbname";
|
||||
values[n] = conninfo;
|
||||
n++;
|
||||
keywords[n] = NULL;
|
||||
values[n] = NULL;
|
||||
n++;
|
||||
pg_conn = PQconnectStartParams(keywords, values, 1);
|
||||
|
||||
/*
|
||||
* Allocation of a PQconn can fail, and will return NULL. We want to fully
|
||||
* replicate the behavior of PQconnectStart here.
|
||||
*/
|
||||
if (!pg_conn)
|
||||
return NULL;
|
||||
|
||||
/*
|
||||
* And in theory this allocation can fail as well, but it's incredibly
|
||||
* unlikely if we just successfully allocated a PGconn.
|
||||
*
|
||||
* palloc will exit on failure though, so there's not much we could do if
|
||||
* it *did* fail.
|
||||
*/
|
||||
conn = palloc(sizeof(WalProposerConn));
|
||||
conn->pg_conn = pg_conn;
|
||||
conn->is_nonblocking = false; /* connections always start in blocking
|
||||
* mode */
|
||||
conn->recvbuf = NULL;
|
||||
return conn;
|
||||
}
|
||||
|
||||
WalProposerConnectPollStatusType
|
||||
walprop_connect_poll(WalProposerConn *conn)
|
||||
{
|
||||
WalProposerConnectPollStatusType return_val;
|
||||
|
||||
switch (PQconnectPoll(conn->pg_conn))
|
||||
{
|
||||
case PGRES_POLLING_FAILED:
|
||||
return_val = WP_CONN_POLLING_FAILED;
|
||||
break;
|
||||
case PGRES_POLLING_READING:
|
||||
return_val = WP_CONN_POLLING_READING;
|
||||
break;
|
||||
case PGRES_POLLING_WRITING:
|
||||
return_val = WP_CONN_POLLING_WRITING;
|
||||
break;
|
||||
case PGRES_POLLING_OK:
|
||||
return_val = WP_CONN_POLLING_OK;
|
||||
break;
|
||||
|
||||
/*
|
||||
* There's a comment at its source about this constant being
|
||||
* unused. We'll expect it's never returned.
|
||||
*/
|
||||
case PGRES_POLLING_ACTIVE:
|
||||
elog(FATAL, "Unexpected PGRES_POLLING_ACTIVE returned from PQconnectPoll");
|
||||
|
||||
/*
|
||||
* This return is never actually reached, but it's here to make
|
||||
* the compiler happy
|
||||
*/
|
||||
return WP_CONN_POLLING_FAILED;
|
||||
|
||||
default:
|
||||
Assert(false);
|
||||
return_val = WP_CONN_POLLING_FAILED; /* keep the compiler quiet */
|
||||
}
|
||||
|
||||
return return_val;
|
||||
}
|
||||
|
||||
bool
|
||||
walprop_send_query(WalProposerConn *conn, char *query)
|
||||
{
|
||||
/*
|
||||
* We need to be in blocking mode for sending the query to run without
|
||||
* requiring a call to PQflush
|
||||
*/
|
||||
if (!ensure_nonblocking_status(conn, false))
|
||||
return false;
|
||||
|
||||
/* PQsendQuery returns 1 on success, 0 on failure */
|
||||
if (!PQsendQuery(conn->pg_conn, query))
|
||||
return false;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
WalProposerExecStatusType
|
||||
walprop_get_query_result(WalProposerConn *conn)
|
||||
{
|
||||
PGresult *result;
|
||||
WalProposerExecStatusType return_val;
|
||||
|
||||
/* Marker variable if we need to log an unexpected success result */
|
||||
char *unexpected_success = NULL;
|
||||
|
||||
/* Consume any input that we might be missing */
|
||||
if (!PQconsumeInput(conn->pg_conn))
|
||||
return WP_EXEC_FAILED;
|
||||
|
||||
if (PQisBusy(conn->pg_conn))
|
||||
return WP_EXEC_NEEDS_INPUT;
|
||||
|
||||
|
||||
result = PQgetResult(conn->pg_conn);
|
||||
|
||||
/*
|
||||
* PQgetResult returns NULL only if getting the result was successful &
|
||||
* there's no more of the result to get.
|
||||
*/
|
||||
if (!result)
|
||||
{
|
||||
elog(WARNING, "[libpqwalproposer] Unexpected successful end of command results");
|
||||
return WP_EXEC_UNEXPECTED_SUCCESS;
|
||||
}
|
||||
|
||||
/* Helper macro to reduce boilerplate */
|
||||
#define UNEXPECTED_SUCCESS(msg) \
|
||||
return_val = WP_EXEC_UNEXPECTED_SUCCESS; \
|
||||
unexpected_success = msg; \
|
||||
break;
|
||||
|
||||
|
||||
switch (PQresultStatus(result))
|
||||
{
|
||||
/* "true" success case */
|
||||
case PGRES_COPY_BOTH:
|
||||
return_val = WP_EXEC_SUCCESS_COPYBOTH;
|
||||
break;
|
||||
|
||||
/* Unexpected success case */
|
||||
case PGRES_EMPTY_QUERY:
|
||||
UNEXPECTED_SUCCESS("empty query return");
|
||||
case PGRES_COMMAND_OK:
|
||||
UNEXPECTED_SUCCESS("data-less command end");
|
||||
case PGRES_TUPLES_OK:
|
||||
UNEXPECTED_SUCCESS("tuples return");
|
||||
case PGRES_COPY_OUT:
|
||||
UNEXPECTED_SUCCESS("'Copy Out' response");
|
||||
case PGRES_COPY_IN:
|
||||
UNEXPECTED_SUCCESS("'Copy In' response");
|
||||
case PGRES_SINGLE_TUPLE:
|
||||
UNEXPECTED_SUCCESS("single tuple return");
|
||||
case PGRES_PIPELINE_SYNC:
|
||||
UNEXPECTED_SUCCESS("pipeline sync point");
|
||||
|
||||
/* Failure cases */
|
||||
case PGRES_BAD_RESPONSE:
|
||||
case PGRES_NONFATAL_ERROR:
|
||||
case PGRES_FATAL_ERROR:
|
||||
case PGRES_PIPELINE_ABORTED:
|
||||
return_val = WP_EXEC_FAILED;
|
||||
break;
|
||||
|
||||
default:
|
||||
Assert(false);
|
||||
return_val = WP_EXEC_FAILED; /* keep the compiler quiet */
|
||||
}
|
||||
|
||||
if (unexpected_success)
|
||||
elog(WARNING, "[libpqwalproposer] Unexpected successful %s", unexpected_success);
|
||||
|
||||
return return_val;
|
||||
}
|
||||
|
||||
pgsocket
|
||||
walprop_socket(WalProposerConn *conn)
|
||||
{
|
||||
return PQsocket(conn->pg_conn);
|
||||
}
|
||||
|
||||
int
|
||||
walprop_flush(WalProposerConn *conn)
|
||||
{
|
||||
return (PQflush(conn->pg_conn));
|
||||
}
|
||||
|
||||
void
|
||||
walprop_finish(WalProposerConn *conn)
|
||||
{
|
||||
if (conn->recvbuf != NULL)
|
||||
PQfreemem(conn->recvbuf);
|
||||
PQfinish(conn->pg_conn);
|
||||
pfree(conn);
|
||||
}
|
||||
|
||||
/*
|
||||
* Receive a message from the safekeeper.
|
||||
*
|
||||
* On success, the data is placed in *buf. It is valid until the next call
|
||||
* to this function.
|
||||
*/
|
||||
PGAsyncReadResult
|
||||
walprop_async_read(WalProposerConn *conn, char **buf, int *amount)
|
||||
{
|
||||
int result;
|
||||
|
||||
if (conn->recvbuf != NULL)
|
||||
{
|
||||
PQfreemem(conn->recvbuf);
|
||||
conn->recvbuf = NULL;
|
||||
}
|
||||
|
||||
/* Call PQconsumeInput so that we have the data we need */
|
||||
if (!PQconsumeInput(conn->pg_conn))
|
||||
{
|
||||
*amount = 0;
|
||||
*buf = NULL;
|
||||
return PG_ASYNC_READ_FAIL;
|
||||
}
|
||||
|
||||
/*
|
||||
* The docs for PQgetCopyData list the return values as: 0 if the copy is
|
||||
* still in progress, but no "complete row" is available -1 if the copy is
|
||||
* done -2 if an error occurred (> 0) if it was successful; that value is
|
||||
* the amount transferred.
|
||||
*
|
||||
* The protocol we use between walproposer and safekeeper means that we
|
||||
* *usually* wouldn't expect to see that the copy is done, but this can
|
||||
* sometimes be triggered by the server returning an ErrorResponse (which
|
||||
* also happens to have the effect that the copy is done).
|
||||
*/
|
||||
switch (result = PQgetCopyData(conn->pg_conn, &conn->recvbuf, true))
|
||||
{
|
||||
case 0:
|
||||
*amount = 0;
|
||||
*buf = NULL;
|
||||
return PG_ASYNC_READ_TRY_AGAIN;
|
||||
case -1:
|
||||
{
|
||||
/*
|
||||
* If we get -1, it's probably because of a server error; the
|
||||
* safekeeper won't normally send a CopyDone message.
|
||||
*
|
||||
* We can check PQgetResult to make sure that the server
|
||||
* failed; it'll always result in PGRES_FATAL_ERROR
|
||||
*/
|
||||
ExecStatusType status = PQresultStatus(PQgetResult(conn->pg_conn));
|
||||
|
||||
if (status != PGRES_FATAL_ERROR)
|
||||
elog(FATAL, "unexpected result status %d after failed PQgetCopyData", status);
|
||||
|
||||
/*
|
||||
* If there was actually an error, it'll be properly reported
|
||||
* by calls to PQerrorMessage -- we don't have to do anything
|
||||
* else
|
||||
*/
|
||||
*amount = 0;
|
||||
*buf = NULL;
|
||||
return PG_ASYNC_READ_FAIL;
|
||||
}
|
||||
case -2:
|
||||
*amount = 0;
|
||||
*buf = NULL;
|
||||
return PG_ASYNC_READ_FAIL;
|
||||
default:
|
||||
/* Positive values indicate the size of the returned result */
|
||||
*amount = result;
|
||||
*buf = conn->recvbuf;
|
||||
return PG_ASYNC_READ_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
PGAsyncWriteResult
|
||||
walprop_async_write(WalProposerConn *conn, void const *buf, size_t size)
|
||||
{
|
||||
int result;
|
||||
|
||||
/* If we aren't in non-blocking mode, switch to it. */
|
||||
if (!ensure_nonblocking_status(conn, true))
|
||||
return PG_ASYNC_WRITE_FAIL;
|
||||
|
||||
/*
|
||||
* The docs for PQputcopyData list the return values as: 1 if the data was
|
||||
* queued, 0 if it was not queued because of full buffers, or -1 if an
|
||||
* error occurred
|
||||
*/
|
||||
result = PQputCopyData(conn->pg_conn, buf, size);
|
||||
|
||||
/*
|
||||
* We won't get a result of zero because walproposer always empties the
|
||||
* connection's buffers before sending more
|
||||
*/
|
||||
Assert(result != 0);
|
||||
|
||||
switch (result)
|
||||
{
|
||||
case 1:
|
||||
/* good -- continue */
|
||||
break;
|
||||
case -1:
|
||||
return PG_ASYNC_WRITE_FAIL;
|
||||
default:
|
||||
elog(FATAL, "invalid return %d from PQputCopyData", result);
|
||||
}
|
||||
|
||||
/*
|
||||
* After queueing the data, we still need to flush to get it to send. This
|
||||
* might take multiple tries, but we don't want to wait around until it's
|
||||
* done.
|
||||
*
|
||||
* PQflush has the following returns (directly quoting the docs): 0 if
|
||||
* sucessful, 1 if it was unable to send all the data in the send queue
|
||||
* yet -1 if it failed for some reason
|
||||
*/
|
||||
switch (result = PQflush(conn->pg_conn))
|
||||
{
|
||||
case 0:
|
||||
return PG_ASYNC_WRITE_SUCCESS;
|
||||
case 1:
|
||||
return PG_ASYNC_WRITE_TRY_FLUSH;
|
||||
case -1:
|
||||
return PG_ASYNC_WRITE_FAIL;
|
||||
default:
|
||||
elog(FATAL, "invalid return %d from PQflush", result);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* This function is very similar to walprop_async_write. For more
|
||||
* information, refer to the comments there.
|
||||
*/
|
||||
bool
|
||||
walprop_blocking_write(WalProposerConn *conn, void const *buf, size_t size)
|
||||
{
|
||||
int result;
|
||||
|
||||
/* If we are in non-blocking mode, switch out of it. */
|
||||
if (!ensure_nonblocking_status(conn, false))
|
||||
return false;
|
||||
|
||||
if ((result = PQputCopyData(conn->pg_conn, buf, size)) == -1)
|
||||
return false;
|
||||
|
||||
Assert(result == 1);
|
||||
|
||||
/* Because the connection is non-blocking, flushing returns 0 or -1 */
|
||||
|
||||
if ((result = PQflush(conn->pg_conn)) == -1)
|
||||
return false;
|
||||
|
||||
Assert(result == 0);
|
||||
return true;
|
||||
}
|
||||
@@ -18,6 +18,10 @@ extern char *neon_auth_token;
|
||||
extern char *neon_timeline;
|
||||
extern char *neon_tenant;
|
||||
|
||||
extern char *wal_acceptors_list;
|
||||
extern int wal_acceptor_reconnect_timeout;
|
||||
extern int wal_acceptor_connection_timeout;
|
||||
|
||||
extern void pg_init_libpagestore(void);
|
||||
extern void pg_init_walproposer(void);
|
||||
|
||||
@@ -30,4 +34,10 @@ extern void pg_init_extension_server(void);
|
||||
extern bool neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id);
|
||||
extern bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id);
|
||||
|
||||
extern uint64 BackpressureThrottlingTime(void);
|
||||
extern void replication_feedback_get_lsns(XLogRecPtr *writeLsn, XLogRecPtr *flushLsn, XLogRecPtr *applyLsn);
|
||||
|
||||
extern void PGDLLEXPORT WalProposerSync(int argc, char *argv[]);
|
||||
extern void PGDLLEXPORT WalProposerMain(Datum main_arg);
|
||||
|
||||
#endif /* NEON_H */
|
||||
|
||||
116
pgxn/neon/neon_utils.c
Normal file
116
pgxn/neon/neon_utils.c
Normal file
@@ -0,0 +1,116 @@
|
||||
#include "postgres.h"
|
||||
|
||||
#include "access/timeline.h"
|
||||
#include "access/xlogutils.h"
|
||||
#include "common/logging.h"
|
||||
#include "common/ip.h"
|
||||
#include "funcapi.h"
|
||||
#include "libpq/libpq.h"
|
||||
#include "libpq/pqformat.h"
|
||||
#include "miscadmin.h"
|
||||
#include "postmaster/interrupt.h"
|
||||
#include "replication/slot.h"
|
||||
#include "replication/walsender_private.h"
|
||||
|
||||
#include "storage/ipc.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/ps_status.h"
|
||||
|
||||
#include "libpq-fe.h"
|
||||
#include <netinet/tcp.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
#include "access/xlogutils.h"
|
||||
#include "access/xlogrecovery.h"
|
||||
#endif
|
||||
#if PG_MAJORVERSION_NUM >= 16
|
||||
#include "utils/guc.h"
|
||||
#endif
|
||||
|
||||
/*
|
||||
* Convert a character which represents a hexadecimal digit to an integer.
|
||||
*
|
||||
* Returns -1 if the character is not a hexadecimal digit.
|
||||
*/
|
||||
int
|
||||
HexDecodeChar(char c)
|
||||
{
|
||||
if (c >= '0' && c <= '9')
|
||||
return c - '0';
|
||||
if (c >= 'a' && c <= 'f')
|
||||
return c - 'a' + 10;
|
||||
if (c >= 'A' && c <= 'F')
|
||||
return c - 'A' + 10;
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
/*
|
||||
* Decode a hex string into a byte string, 2 hex chars per byte.
|
||||
*
|
||||
* Returns false if invalid characters are encountered; otherwise true.
|
||||
*/
|
||||
bool
|
||||
HexDecodeString(uint8 *result, char *input, int nbytes)
|
||||
{
|
||||
int i;
|
||||
|
||||
for (i = 0; i < nbytes; ++i)
|
||||
{
|
||||
int n1 = HexDecodeChar(input[i * 2]);
|
||||
int n2 = HexDecodeChar(input[i * 2 + 1]);
|
||||
|
||||
if (n1 < 0 || n2 < 0)
|
||||
return false;
|
||||
result[i] = n1 * 16 + n2;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/* --------------------------------
|
||||
* pq_getmsgint32_le - get a binary 4-byte int from a message buffer in native (LE) order
|
||||
* --------------------------------
|
||||
*/
|
||||
uint32
|
||||
pq_getmsgint32_le(StringInfo msg)
|
||||
{
|
||||
uint32 n32;
|
||||
|
||||
pq_copymsgbytes(msg, (char *) &n32, sizeof(n32));
|
||||
|
||||
return n32;
|
||||
}
|
||||
|
||||
/* --------------------------------
|
||||
* pq_getmsgint64 - get a binary 8-byte int from a message buffer in native (LE) order
|
||||
* --------------------------------
|
||||
*/
|
||||
uint64
|
||||
pq_getmsgint64_le(StringInfo msg)
|
||||
{
|
||||
uint64 n64;
|
||||
|
||||
pq_copymsgbytes(msg, (char *) &n64, sizeof(n64));
|
||||
|
||||
return n64;
|
||||
}
|
||||
|
||||
/* append a binary [u]int32 to a StringInfo buffer in native (LE) order */
|
||||
void
|
||||
pq_sendint32_le(StringInfo buf, uint32 i)
|
||||
{
|
||||
enlargeStringInfo(buf, sizeof(uint32));
|
||||
memcpy(buf->data + buf->len, &i, sizeof(uint32));
|
||||
buf->len += sizeof(uint32);
|
||||
}
|
||||
|
||||
/* append a binary [u]int64 to a StringInfo buffer in native (LE) order */
|
||||
void
|
||||
pq_sendint64_le(StringInfo buf, uint64 i)
|
||||
{
|
||||
enlargeStringInfo(buf, sizeof(uint64));
|
||||
memcpy(buf->data + buf->len, &i, sizeof(uint64));
|
||||
buf->len += sizeof(uint64);
|
||||
}
|
||||
12
pgxn/neon/neon_utils.h
Normal file
12
pgxn/neon/neon_utils.h
Normal file
@@ -0,0 +1,12 @@
|
||||
#ifndef __NEON_UTILS_H__
|
||||
#define __NEON_UTILS_H__
|
||||
|
||||
#include "postgres.h"
|
||||
|
||||
bool HexDecodeString(uint8 *result, char *input, int nbytes);
|
||||
uint32 pq_getmsgint32_le(StringInfo msg);
|
||||
uint64 pq_getmsgint64_le(StringInfo msg);
|
||||
void pq_sendint32_le(StringInfo buf, uint32 i);
|
||||
void pq_sendint64_le(StringInfo buf, uint64 i);
|
||||
|
||||
#endif /* __NEON_UTILS_H__ */
|
||||
@@ -721,7 +721,7 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls
|
||||
|
||||
/* use an intermediate PrefetchRequest struct to ensure correct alignment */
|
||||
req.buftag = tag;
|
||||
|
||||
Retry:
|
||||
entry = prfh_lookup(MyPState->prf_hash, (PrefetchRequest *) &req);
|
||||
|
||||
if (entry != NULL)
|
||||
@@ -858,7 +858,11 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls
|
||||
if (flush_every_n_requests > 0 &&
|
||||
MyPState->ring_unused - MyPState->ring_flush >= flush_every_n_requests)
|
||||
{
|
||||
page_server->flush();
|
||||
if (!page_server->flush())
|
||||
{
|
||||
/* Prefetch set is reset in case of error, so we should try to register our request once again */
|
||||
goto Retry;
|
||||
}
|
||||
MyPState->ring_flush = MyPState->ring_unused;
|
||||
}
|
||||
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,8 +1,8 @@
|
||||
#ifndef __NEON_WALPROPOSER_H__
|
||||
#define __NEON_WALPROPOSER_H__
|
||||
|
||||
#include "access/xlogdefs.h"
|
||||
#include "postgres.h"
|
||||
#include "access/xlogdefs.h"
|
||||
#include "port.h"
|
||||
#include "access/xlog_internal.h"
|
||||
#include "access/transam.h"
|
||||
@@ -16,29 +16,15 @@
|
||||
#define MAX_SAFEKEEPERS 32
|
||||
#define MAX_SEND_SIZE (XLOG_BLCKSZ * 16) /* max size of a single* WAL
|
||||
* message */
|
||||
#define XLOG_HDR_SIZE (1 + 8 * 3) /* 'w' + startPos + walEnd + timestamp */
|
||||
#define XLOG_HDR_START_POS 1 /* offset of start position in wal sender*
|
||||
* message header */
|
||||
#define XLOG_HDR_END_POS (1 + 8) /* offset of end position in wal sender*
|
||||
* message header */
|
||||
|
||||
/*
|
||||
* In the spirit of WL_SOCKET_READABLE and others, this corresponds to no events having occurred,
|
||||
* because all WL_* events are given flags equal to some (1 << i), starting from i = 0
|
||||
*/
|
||||
#define WL_NO_EVENTS 0
|
||||
|
||||
extern char *wal_acceptors_list;
|
||||
extern int wal_acceptor_reconnect_timeout;
|
||||
extern int wal_acceptor_connection_timeout;
|
||||
extern bool am_wal_proposer;
|
||||
|
||||
struct WalProposerConn; /* Defined in libpqwalproposer */
|
||||
struct WalProposerConn; /* Defined in implementation (walprop_pg.c) */
|
||||
typedef struct WalProposerConn WalProposerConn;
|
||||
|
||||
struct WalMessage;
|
||||
typedef struct WalMessage WalMessage;
|
||||
|
||||
/* Possible return values from ReadPGAsync */
|
||||
typedef enum
|
||||
{
|
||||
@@ -52,7 +38,7 @@ typedef enum
|
||||
PG_ASYNC_READ_TRY_AGAIN,
|
||||
/* Reading failed. Check PQerrorMessage(conn) */
|
||||
PG_ASYNC_READ_FAIL,
|
||||
} PGAsyncReadResult;
|
||||
} PGAsyncReadResult;
|
||||
|
||||
/* Possible return values from WritePGAsync */
|
||||
typedef enum
|
||||
@@ -71,7 +57,7 @@ typedef enum
|
||||
PG_ASYNC_WRITE_TRY_FLUSH,
|
||||
/* Writing failed. Check PQerrorMessage(conn) */
|
||||
PG_ASYNC_WRITE_FAIL,
|
||||
} PGAsyncWriteResult;
|
||||
} PGAsyncWriteResult;
|
||||
|
||||
/*
|
||||
* WAL safekeeper state, which is used to wait for some event.
|
||||
@@ -147,7 +133,7 @@ typedef enum
|
||||
* to read.
|
||||
*/
|
||||
SS_ACTIVE,
|
||||
} SafekeeperState;
|
||||
} SafekeeperState;
|
||||
|
||||
/* Consensus logical timestamp. */
|
||||
typedef uint64 term_t;
|
||||
@@ -171,12 +157,12 @@ typedef struct ProposerGreeting
|
||||
uint8 tenant_id[16];
|
||||
TimeLineID timeline;
|
||||
uint32 walSegSize;
|
||||
} ProposerGreeting;
|
||||
} ProposerGreeting;
|
||||
|
||||
typedef struct AcceptorProposerMessage
|
||||
{
|
||||
uint64 tag;
|
||||
} AcceptorProposerMessage;
|
||||
} AcceptorProposerMessage;
|
||||
|
||||
/*
|
||||
* Acceptor -> Proposer initial response: the highest term acceptor voted for.
|
||||
@@ -186,7 +172,7 @@ typedef struct AcceptorGreeting
|
||||
AcceptorProposerMessage apm;
|
||||
term_t term;
|
||||
NNodeId nodeId;
|
||||
} AcceptorGreeting;
|
||||
} AcceptorGreeting;
|
||||
|
||||
/*
|
||||
* Proposer -> Acceptor vote request.
|
||||
@@ -196,20 +182,20 @@ typedef struct VoteRequest
|
||||
uint64 tag;
|
||||
term_t term;
|
||||
pg_uuid_t proposerId; /* for monitoring/debugging */
|
||||
} VoteRequest;
|
||||
} VoteRequest;
|
||||
|
||||
/* Element of term switching chain. */
|
||||
typedef struct TermSwitchEntry
|
||||
{
|
||||
term_t term;
|
||||
XLogRecPtr lsn;
|
||||
} TermSwitchEntry;
|
||||
} TermSwitchEntry;
|
||||
|
||||
typedef struct TermHistory
|
||||
{
|
||||
uint32 n_entries;
|
||||
TermSwitchEntry *entries;
|
||||
} TermHistory;
|
||||
} TermHistory;
|
||||
|
||||
/* Vote itself, sent from safekeeper to proposer */
|
||||
typedef struct VoteResponse
|
||||
@@ -227,7 +213,7 @@ typedef struct VoteResponse
|
||||
* recovery of some safekeeper */
|
||||
TermHistory termHistory;
|
||||
XLogRecPtr timelineStartLsn; /* timeline globally starts at this LSN */
|
||||
} VoteResponse;
|
||||
} VoteResponse;
|
||||
|
||||
/*
|
||||
* Proposer -> Acceptor message announcing proposer is elected and communicating
|
||||
@@ -243,7 +229,7 @@ typedef struct ProposerElected
|
||||
TermHistory *termHistory;
|
||||
/* timeline globally starts at this LSN */
|
||||
XLogRecPtr timelineStartLsn;
|
||||
} ProposerElected;
|
||||
} ProposerElected;
|
||||
|
||||
/*
|
||||
* Header of request with WAL message sent from proposer to safekeeper.
|
||||
@@ -268,7 +254,7 @@ typedef struct AppendRequestHeader
|
||||
*/
|
||||
XLogRecPtr truncateLsn;
|
||||
pg_uuid_t proposerId; /* for monitoring/debugging */
|
||||
} AppendRequestHeader;
|
||||
} AppendRequestHeader;
|
||||
|
||||
/*
|
||||
* Hot standby feedback received from replica
|
||||
@@ -278,7 +264,7 @@ typedef struct HotStandbyFeedback
|
||||
TimestampTz ts;
|
||||
FullTransactionId xmin;
|
||||
FullTransactionId catalog_xmin;
|
||||
} HotStandbyFeedback;
|
||||
} HotStandbyFeedback;
|
||||
|
||||
typedef struct PageserverFeedback
|
||||
{
|
||||
@@ -289,7 +275,7 @@ typedef struct PageserverFeedback
|
||||
XLogRecPtr disk_consistent_lsn;
|
||||
XLogRecPtr remote_consistent_lsn;
|
||||
TimestampTz replytime;
|
||||
} PageserverFeedback;
|
||||
} PageserverFeedback;
|
||||
|
||||
typedef struct WalproposerShmemState
|
||||
{
|
||||
@@ -297,7 +283,7 @@ typedef struct WalproposerShmemState
|
||||
PageserverFeedback feedback;
|
||||
term_t mineLastElectedTerm;
|
||||
pg_atomic_uint64 backpressureThrottlingTime;
|
||||
} WalproposerShmemState;
|
||||
} WalproposerShmemState;
|
||||
|
||||
/*
|
||||
* Report safekeeper state to proposer
|
||||
@@ -321,17 +307,22 @@ typedef struct AppendResponse
|
||||
/* and custom neon feedback. */
|
||||
/* This part of the message is extensible. */
|
||||
PageserverFeedback rf;
|
||||
} AppendResponse;
|
||||
} AppendResponse;
|
||||
|
||||
/* PageserverFeedback is extensible part of the message that is parsed separately */
|
||||
/* Other fields are fixed part */
|
||||
#define APPENDRESPONSE_FIXEDPART_SIZE offsetof(AppendResponse, rf)
|
||||
|
||||
struct WalProposer;
|
||||
typedef struct WalProposer WalProposer;
|
||||
|
||||
/*
|
||||
* Descriptor of safekeeper
|
||||
*/
|
||||
typedef struct Safekeeper
|
||||
{
|
||||
WalProposer *wp;
|
||||
|
||||
char const *host;
|
||||
char const *port;
|
||||
|
||||
@@ -340,7 +331,7 @@ typedef struct Safekeeper
|
||||
*
|
||||
* May contain private information like password and should not be logged.
|
||||
*/
|
||||
char conninfo[MAXCONNINFO];
|
||||
char conninfo[MAXCONNINFO];
|
||||
|
||||
/*
|
||||
* postgres protocol connection to the WAL acceptor
|
||||
@@ -373,27 +364,12 @@ typedef struct Safekeeper
|
||||
int eventPos; /* position in wait event set. Equal to -1 if*
|
||||
* no event */
|
||||
SafekeeperState state; /* safekeeper state machine state */
|
||||
TimestampTz latestMsgReceivedAt; /* when latest msg is received */
|
||||
TimestampTz latestMsgReceivedAt; /* when latest msg is received */
|
||||
AcceptorGreeting greetResponse; /* acceptor greeting */
|
||||
VoteResponse voteResponse; /* the vote */
|
||||
AppendResponse appendResponse; /* feedback for master */
|
||||
} Safekeeper;
|
||||
|
||||
extern void PGDLLEXPORT WalProposerSync(int argc, char *argv[]);
|
||||
extern void PGDLLEXPORT WalProposerMain(Datum main_arg);
|
||||
extern void WalProposerBroadcast(XLogRecPtr startpos, XLogRecPtr endpos);
|
||||
extern void WalProposerPoll(void);
|
||||
extern void ParsePageserverFeedbackMessage(StringInfo reply_message,
|
||||
PageserverFeedback *rf);
|
||||
extern void StartProposerReplication(StartReplicationCmd *cmd);
|
||||
|
||||
extern Size WalproposerShmemSize(void);
|
||||
extern bool WalproposerShmemInit(void);
|
||||
extern void replication_feedback_set(PageserverFeedback *rf);
|
||||
extern void replication_feedback_get_lsns(XLogRecPtr *writeLsn, XLogRecPtr *flushLsn, XLogRecPtr *applyLsn);
|
||||
|
||||
/* libpqwalproposer hooks & helper type */
|
||||
|
||||
/* Re-exported PostgresPollingStatusType */
|
||||
typedef enum
|
||||
{
|
||||
@@ -406,7 +382,7 @@ typedef enum
|
||||
* 'libpq-fe.h' still has PGRES_POLLING_ACTIVE, but says it's unused.
|
||||
* We've removed it here to avoid clutter.
|
||||
*/
|
||||
} WalProposerConnectPollStatusType;
|
||||
} WalProposerConnectPollStatusType;
|
||||
|
||||
/* Re-exported and modified ExecStatusType */
|
||||
typedef enum
|
||||
@@ -431,7 +407,7 @@ typedef enum
|
||||
WP_EXEC_NEEDS_INPUT,
|
||||
/* Catch-all failure. Check PQerrorMessage. */
|
||||
WP_EXEC_FAILED,
|
||||
} WalProposerExecStatusType;
|
||||
} WalProposerExecStatusType;
|
||||
|
||||
/* Re-exported ConnStatusType */
|
||||
typedef enum
|
||||
@@ -445,67 +421,252 @@ typedef enum
|
||||
* that extra functionality, so we collect them into a single tag here.
|
||||
*/
|
||||
WP_CONNECTION_IN_PROGRESS,
|
||||
} WalProposerConnStatusType;
|
||||
|
||||
/* Re-exported PQerrorMessage */
|
||||
extern char *walprop_error_message(WalProposerConn *conn);
|
||||
|
||||
/* Re-exported PQstatus */
|
||||
extern WalProposerConnStatusType walprop_status(WalProposerConn *conn);
|
||||
|
||||
/* Re-exported PQconnectStart */
|
||||
extern WalProposerConn * walprop_connect_start(char *conninfo, char *password);
|
||||
|
||||
/* Re-exported PQconectPoll */
|
||||
extern WalProposerConnectPollStatusType walprop_connect_poll(WalProposerConn *conn);
|
||||
|
||||
/* Blocking wrapper around PQsendQuery */
|
||||
extern bool walprop_send_query(WalProposerConn *conn, char *query);
|
||||
|
||||
/* Wrapper around PQconsumeInput + PQisBusy + PQgetResult */
|
||||
extern WalProposerExecStatusType walprop_get_query_result(WalProposerConn *conn);
|
||||
|
||||
/* Re-exported PQsocket */
|
||||
extern pgsocket walprop_socket(WalProposerConn *conn);
|
||||
|
||||
/* Wrapper around PQconsumeInput (if socket's read-ready) + PQflush */
|
||||
extern int walprop_flush(WalProposerConn *conn);
|
||||
|
||||
/* Re-exported PQfinish */
|
||||
extern void walprop_finish(WalProposerConn *conn);
|
||||
} WalProposerConnStatusType;
|
||||
|
||||
/*
|
||||
* Ergonomic wrapper around PGgetCopyData
|
||||
*
|
||||
* Reads a CopyData block from a safekeeper, setting *amount to the number
|
||||
* of bytes returned.
|
||||
*
|
||||
* This function is allowed to assume certain properties specific to the
|
||||
* protocol with the safekeepers, so it should not be used as-is for any
|
||||
* other purpose.
|
||||
*
|
||||
* Note: If possible, using <AsyncRead> is generally preferred, because it
|
||||
* performs a bit of extra checking work that's always required and is normally
|
||||
* somewhat verbose.
|
||||
* Collection of hooks for walproposer, to call postgres functions,
|
||||
* read WAL and send it over the network.
|
||||
*/
|
||||
extern PGAsyncReadResult walprop_async_read(WalProposerConn *conn, char **buf, int *amount);
|
||||
typedef struct walproposer_api
|
||||
{
|
||||
/*
|
||||
* Get WalproposerShmemState. This is used to store information about last
|
||||
* elected term.
|
||||
*/
|
||||
WalproposerShmemState *(*get_shmem_state) (void);
|
||||
|
||||
/*
|
||||
* Start receiving notifications about new WAL. This is an infinite loop
|
||||
* which calls WalProposerBroadcast() and WalProposerPoll() to send the
|
||||
* WAL.
|
||||
*/
|
||||
void (*start_streaming) (WalProposer *wp, XLogRecPtr startpos);
|
||||
|
||||
/* Get pointer to the latest available WAL. */
|
||||
XLogRecPtr (*get_flush_rec_ptr) (void);
|
||||
|
||||
/* Get current time. */
|
||||
TimestampTz (*get_current_timestamp) (void);
|
||||
|
||||
/* Get postgres timeline. */
|
||||
TimeLineID (*get_timeline_id) (void);
|
||||
|
||||
/* Current error message, aka PQerrorMessage. */
|
||||
char *(*conn_error_message) (WalProposerConn *conn);
|
||||
|
||||
/* Connection status, aka PQstatus. */
|
||||
WalProposerConnStatusType (*conn_status) (WalProposerConn *conn);
|
||||
|
||||
/* Start the connection, aka PQconnectStart. */
|
||||
WalProposerConn *(*conn_connect_start) (char *conninfo);
|
||||
|
||||
/* Poll an asynchronous connection, aka PQconnectPoll. */
|
||||
WalProposerConnectPollStatusType (*conn_connect_poll) (WalProposerConn *conn);
|
||||
|
||||
/* Send a blocking SQL query, aka PQsendQuery. */
|
||||
bool (*conn_send_query) (WalProposerConn *conn, char *query);
|
||||
|
||||
/* Read the query result, aka PQgetResult. */
|
||||
WalProposerExecStatusType (*conn_get_query_result) (WalProposerConn *conn);
|
||||
|
||||
/* Flush buffer to the network, aka PQflush. */
|
||||
int (*conn_flush) (WalProposerConn *conn);
|
||||
|
||||
/* Close the connection, aka PQfinish. */
|
||||
void (*conn_finish) (WalProposerConn *conn);
|
||||
|
||||
/* Try to read CopyData message, aka PQgetCopyData. */
|
||||
PGAsyncReadResult (*conn_async_read) (WalProposerConn *conn, char **buf, int *amount);
|
||||
|
||||
/* Try to write CopyData message, aka PQputCopyData. */
|
||||
PGAsyncWriteResult (*conn_async_write) (WalProposerConn *conn, void const *buf, size_t size);
|
||||
|
||||
/* Blocking CopyData write, aka PQputCopyData + PQflush. */
|
||||
bool (*conn_blocking_write) (WalProposerConn *conn, void const *buf, size_t size);
|
||||
|
||||
/* Download WAL from startpos to endpos and make it available locally. */
|
||||
bool (*recovery_download) (Safekeeper *sk, TimeLineID timeline, XLogRecPtr startpos, XLogRecPtr endpos);
|
||||
|
||||
/* Read WAL from disk to buf. */
|
||||
void (*wal_read) (XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count);
|
||||
|
||||
/* Allocate WAL reader. */
|
||||
XLogReaderState *(*wal_reader_allocate) (void);
|
||||
|
||||
/* Deallocate event set. */
|
||||
void (*free_event_set) (void);
|
||||
|
||||
/* Initialize event set. */
|
||||
void (*init_event_set) (int n_safekeepers);
|
||||
|
||||
/* Update events for an existing safekeeper connection. */
|
||||
void (*update_event_set) (Safekeeper *sk, uint32 events);
|
||||
|
||||
/* Add a new safekeeper connection to the event set. */
|
||||
void (*add_safekeeper_event_set) (Safekeeper *sk, uint32 events);
|
||||
|
||||
/*
|
||||
* Wait until some event happens: - timeout is reached - socket event for
|
||||
* safekeeper connection - new WAL is available
|
||||
*
|
||||
* Returns 0 if timeout is reached, 1 if some event happened. Updates
|
||||
* events mask to indicate events and sets sk to the safekeeper which has
|
||||
* an event.
|
||||
*/
|
||||
int (*wait_event_set) (long timeout, Safekeeper **sk, uint32 *events);
|
||||
|
||||
/* Read random bytes. */
|
||||
bool (*strong_random) (void *buf, size_t len);
|
||||
|
||||
/*
|
||||
* Get a basebackup LSN. Used to cross-validate with the latest available
|
||||
* LSN on the safekeepers.
|
||||
*/
|
||||
XLogRecPtr (*get_redo_start_lsn) (void);
|
||||
|
||||
/*
|
||||
* Finish sync safekeepers with the given LSN. This function should not
|
||||
* return and should exit the program.
|
||||
*/
|
||||
void (*finish_sync_safekeepers) (XLogRecPtr lsn);
|
||||
|
||||
/*
|
||||
* Called after every new message from the safekeeper. Used to propagate
|
||||
* backpressure feedback and to confirm WAL persistence (has been commited
|
||||
* on the quorum of safekeepers).
|
||||
*/
|
||||
void (*process_safekeeper_feedback) (WalProposer *wp, XLogRecPtr commitLsn);
|
||||
|
||||
/*
|
||||
* Called on peer_horizon_lsn updates. Used to advance replication slot
|
||||
* and to free up disk space by deleting unnecessary WAL.
|
||||
*/
|
||||
void (*confirm_wal_streamed) (XLogRecPtr lsn);
|
||||
} walproposer_api;
|
||||
|
||||
/*
|
||||
* Ergonomic wrapper around PQputCopyData + PQflush
|
||||
*
|
||||
* Starts to write a CopyData block to a safekeeper.
|
||||
*
|
||||
* For information on the meaning of return codes, refer to PGAsyncWriteResult.
|
||||
* Configuration of the WAL proposer.
|
||||
*/
|
||||
extern PGAsyncWriteResult walprop_async_write(WalProposerConn *conn, void const *buf, size_t size);
|
||||
typedef struct WalProposerConfig
|
||||
{
|
||||
/* hex-encoded TenantId cstr */
|
||||
char *neon_tenant;
|
||||
|
||||
/* hex-encoded TimelineId cstr */
|
||||
char *neon_timeline;
|
||||
|
||||
/*
|
||||
* Comma-separated list of safekeepers, in the following format:
|
||||
* host1:port1,host2:port2,host3:port3
|
||||
*
|
||||
* This cstr should be editable.
|
||||
*/
|
||||
char *safekeepers_list;
|
||||
|
||||
/*
|
||||
* WalProposer reconnects to offline safekeepers once in this interval.
|
||||
* Time is in milliseconds.
|
||||
*/
|
||||
int safekeeper_reconnect_timeout;
|
||||
|
||||
/*
|
||||
* WalProposer terminates the connection if it doesn't receive any message
|
||||
* from the safekeeper in this interval. Time is in milliseconds.
|
||||
*/
|
||||
int safekeeper_connection_timeout;
|
||||
|
||||
/*
|
||||
* WAL segment size. Will be passed to safekeepers in greet request. Also
|
||||
* used to detect page headers.
|
||||
*/
|
||||
int wal_segment_size;
|
||||
|
||||
/*
|
||||
* If safekeeper was started in sync mode, walproposer will not subscribe
|
||||
* for new WAL and will exit when quorum of safekeepers will be synced to
|
||||
* the latest available LSN.
|
||||
*/
|
||||
bool syncSafekeepers;
|
||||
|
||||
/* Will be passed to safekeepers in greet request. */
|
||||
uint64 systemId;
|
||||
} WalProposerConfig;
|
||||
|
||||
|
||||
/*
|
||||
* Blocking equivalent to walprop_async_write_fn
|
||||
*
|
||||
* Returns 'true' if successful, 'false' on failure.
|
||||
* WAL proposer state.
|
||||
*/
|
||||
extern bool walprop_blocking_write(WalProposerConn *conn, void const *buf, size_t size);
|
||||
typedef struct WalProposer
|
||||
{
|
||||
WalProposerConfig *config;
|
||||
int n_safekeepers;
|
||||
|
||||
extern uint64 BackpressureThrottlingTime(void);
|
||||
/* (n_safekeepers / 2) + 1 */
|
||||
int quorum;
|
||||
|
||||
Safekeeper safekeeper[MAX_SAFEKEEPERS];
|
||||
|
||||
/* WAL has been generated up to this point */
|
||||
XLogRecPtr availableLsn;
|
||||
|
||||
/* last commitLsn broadcasted to safekeepers */
|
||||
XLogRecPtr lastSentCommitLsn;
|
||||
|
||||
ProposerGreeting greetRequest;
|
||||
|
||||
/* Vote request for safekeeper */
|
||||
VoteRequest voteRequest;
|
||||
|
||||
/*
|
||||
* Minimal LSN which may be needed for recovery of some safekeeper,
|
||||
* record-aligned (first record which might not yet received by someone).
|
||||
*/
|
||||
XLogRecPtr truncateLsn;
|
||||
|
||||
/*
|
||||
* Term of the proposer. We want our term to be highest and unique, so we
|
||||
* collect terms from safekeepers quorum, choose max and +1. After that
|
||||
* our term is fixed and must not change. If we observe that some
|
||||
* safekeeper has higher term, it means that we have another running
|
||||
* compute, so we must stop immediately.
|
||||
*/
|
||||
term_t propTerm;
|
||||
|
||||
/* term history of the proposer */
|
||||
TermHistory propTermHistory;
|
||||
|
||||
/* epoch start lsn of the proposer */
|
||||
XLogRecPtr propEpochStartLsn;
|
||||
|
||||
/* Most advanced acceptor epoch */
|
||||
term_t donorEpoch;
|
||||
|
||||
/* Most advanced acceptor */
|
||||
int donor;
|
||||
|
||||
/* timeline globally starts at this LSN */
|
||||
XLogRecPtr timelineStartLsn;
|
||||
|
||||
/* number of votes collected from safekeepers */
|
||||
int n_votes;
|
||||
|
||||
/* number of successful connections over the lifetime of walproposer */
|
||||
int n_connected;
|
||||
|
||||
/*
|
||||
* Timestamp of the last reconnection attempt. Related to
|
||||
* config->safekeeper_reconnect_timeout
|
||||
*/
|
||||
TimestampTz last_reconnect_attempt;
|
||||
|
||||
walproposer_api api;
|
||||
} WalProposer;
|
||||
|
||||
extern WalProposer *WalProposerCreate(WalProposerConfig *config, walproposer_api api);
|
||||
extern void WalProposerStart(WalProposer *wp);
|
||||
extern void WalProposerBroadcast(WalProposer *wp, XLogRecPtr startpos, XLogRecPtr endpos);
|
||||
extern void WalProposerPoll(WalProposer *wp);
|
||||
extern void ParsePageserverFeedbackMessage(StringInfo reply_message,
|
||||
PageserverFeedback *rf);
|
||||
|
||||
#endif /* __NEON_WALPROPOSER_H__ */
|
||||
|
||||
1667
pgxn/neon/walproposer_pg.c
Normal file
1667
pgxn/neon/walproposer_pg.c
Normal file
File diff suppressed because it is too large
Load Diff
@@ -1,659 +0,0 @@
|
||||
#include "postgres.h"
|
||||
|
||||
#include "access/timeline.h"
|
||||
#include "access/xlogutils.h"
|
||||
#include "common/logging.h"
|
||||
#include "common/ip.h"
|
||||
#include "funcapi.h"
|
||||
#include "libpq/libpq.h"
|
||||
#include "libpq/pqformat.h"
|
||||
#include "miscadmin.h"
|
||||
#include "postmaster/interrupt.h"
|
||||
#include "replication/slot.h"
|
||||
#include "walproposer_utils.h"
|
||||
#include "replication/walsender_private.h"
|
||||
|
||||
#include "storage/ipc.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/ps_status.h"
|
||||
|
||||
#include "libpq-fe.h"
|
||||
#include <netinet/tcp.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
#include "access/xlogutils.h"
|
||||
#include "access/xlogrecovery.h"
|
||||
#endif
|
||||
#if PG_MAJORVERSION_NUM >= 16
|
||||
#include "utils/guc.h"
|
||||
#endif
|
||||
|
||||
/*
|
||||
* These variables are used similarly to openLogFile/SegNo,
|
||||
* but for walproposer to write the XLOG during recovery. walpropFileTLI is the TimeLineID
|
||||
* corresponding the filename of walpropFile.
|
||||
*/
|
||||
static int walpropFile = -1;
|
||||
static TimeLineID walpropFileTLI = 0;
|
||||
static XLogSegNo walpropSegNo = 0;
|
||||
|
||||
/* START cloned file-local variables and functions from walsender.c */
|
||||
|
||||
/*
|
||||
* How far have we sent WAL already? This is also advertised in
|
||||
* MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.)
|
||||
*/
|
||||
static XLogRecPtr sentPtr = InvalidXLogRecPtr;
|
||||
|
||||
static void WalSndLoop(void);
|
||||
static void XLogBroadcastWalProposer(void);
|
||||
/* END cloned file-level variables and functions from walsender.c */
|
||||
|
||||
int
|
||||
CompareLsn(const void *a, const void *b)
|
||||
{
|
||||
XLogRecPtr lsn1 = *((const XLogRecPtr *) a);
|
||||
XLogRecPtr lsn2 = *((const XLogRecPtr *) b);
|
||||
|
||||
if (lsn1 < lsn2)
|
||||
return -1;
|
||||
else if (lsn1 == lsn2)
|
||||
return 0;
|
||||
else
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* Returns a human-readable string corresonding to the SafekeeperState
|
||||
*
|
||||
* The string should not be freed.
|
||||
*
|
||||
* The strings are intended to be used as a prefix to "state", e.g.:
|
||||
*
|
||||
* elog(LOG, "currently in %s state", FormatSafekeeperState(sk->state));
|
||||
*
|
||||
* If this sort of phrasing doesn't fit the message, instead use something like:
|
||||
*
|
||||
* elog(LOG, "currently in state [%s]", FormatSafekeeperState(sk->state));
|
||||
*/
|
||||
char *
|
||||
FormatSafekeeperState(SafekeeperState state)
|
||||
{
|
||||
char *return_val = NULL;
|
||||
|
||||
switch (state)
|
||||
{
|
||||
case SS_OFFLINE:
|
||||
return_val = "offline";
|
||||
break;
|
||||
case SS_CONNECTING_READ:
|
||||
case SS_CONNECTING_WRITE:
|
||||
return_val = "connecting";
|
||||
break;
|
||||
case SS_WAIT_EXEC_RESULT:
|
||||
return_val = "receiving query result";
|
||||
break;
|
||||
case SS_HANDSHAKE_RECV:
|
||||
return_val = "handshake (receiving)";
|
||||
break;
|
||||
case SS_VOTING:
|
||||
return_val = "voting";
|
||||
break;
|
||||
case SS_WAIT_VERDICT:
|
||||
return_val = "wait-for-verdict";
|
||||
break;
|
||||
case SS_SEND_ELECTED_FLUSH:
|
||||
return_val = "send-announcement-flush";
|
||||
break;
|
||||
case SS_IDLE:
|
||||
return_val = "idle";
|
||||
break;
|
||||
case SS_ACTIVE:
|
||||
return_val = "active";
|
||||
break;
|
||||
}
|
||||
|
||||
Assert(return_val != NULL);
|
||||
|
||||
return return_val;
|
||||
}
|
||||
|
||||
/* Asserts that the provided events are expected for given safekeeper's state */
|
||||
void
|
||||
AssertEventsOkForState(uint32 events, Safekeeper *sk)
|
||||
{
|
||||
uint32 expected = SafekeeperStateDesiredEvents(sk->state);
|
||||
|
||||
/*
|
||||
* The events are in-line with what we're expecting, under two conditions:
|
||||
* (a) if we aren't expecting anything, `events` has no read- or
|
||||
* write-ready component. (b) if we are expecting something, there's
|
||||
* overlap (i.e. `events & expected != 0`)
|
||||
*/
|
||||
bool events_ok_for_state; /* long name so the `Assert` is more
|
||||
* clear later */
|
||||
|
||||
if (expected == WL_NO_EVENTS)
|
||||
events_ok_for_state = ((events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) == 0);
|
||||
else
|
||||
events_ok_for_state = ((events & expected) != 0);
|
||||
|
||||
if (!events_ok_for_state)
|
||||
{
|
||||
/*
|
||||
* To give a descriptive message in the case of failure, we use elog
|
||||
* and then an assertion that's guaranteed to fail.
|
||||
*/
|
||||
elog(WARNING, "events %s mismatched for safekeeper %s:%s in state [%s]",
|
||||
FormatEvents(events), sk->host, sk->port, FormatSafekeeperState(sk->state));
|
||||
Assert(events_ok_for_state);
|
||||
}
|
||||
}
|
||||
|
||||
/* Returns the set of events a safekeeper in this state should be waiting on
|
||||
*
|
||||
* This will return WL_NO_EVENTS (= 0) for some events. */
|
||||
uint32
|
||||
SafekeeperStateDesiredEvents(SafekeeperState state)
|
||||
{
|
||||
uint32 result = WL_NO_EVENTS;
|
||||
|
||||
/* If the state doesn't have a modifier, we can check the base state */
|
||||
switch (state)
|
||||
{
|
||||
/* Connecting states say what they want in the name */
|
||||
case SS_CONNECTING_READ:
|
||||
result = WL_SOCKET_READABLE;
|
||||
break;
|
||||
case SS_CONNECTING_WRITE:
|
||||
result = WL_SOCKET_WRITEABLE;
|
||||
break;
|
||||
|
||||
/* Reading states need the socket to be read-ready to continue */
|
||||
case SS_WAIT_EXEC_RESULT:
|
||||
case SS_HANDSHAKE_RECV:
|
||||
case SS_WAIT_VERDICT:
|
||||
result = WL_SOCKET_READABLE;
|
||||
break;
|
||||
|
||||
/*
|
||||
* Idle states use read-readiness as a sign that the connection
|
||||
* has been disconnected.
|
||||
*/
|
||||
case SS_VOTING:
|
||||
case SS_IDLE:
|
||||
result = WL_SOCKET_READABLE;
|
||||
break;
|
||||
|
||||
/*
|
||||
* Flush states require write-ready for flushing. Active state
|
||||
* does both reading and writing.
|
||||
*
|
||||
* TODO: SS_ACTIVE sometimes doesn't need to be write-ready. We
|
||||
* should check sk->flushWrite here to set WL_SOCKET_WRITEABLE.
|
||||
*/
|
||||
case SS_SEND_ELECTED_FLUSH:
|
||||
case SS_ACTIVE:
|
||||
result = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE;
|
||||
break;
|
||||
|
||||
/* The offline state expects no events. */
|
||||
case SS_OFFLINE:
|
||||
result = WL_NO_EVENTS;
|
||||
break;
|
||||
|
||||
default:
|
||||
Assert(false);
|
||||
break;
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/* Returns a human-readable string corresponding to the event set
|
||||
*
|
||||
* If the events do not correspond to something set as the `events` field of a `WaitEvent`, the
|
||||
* returned string may be meaingless.
|
||||
*
|
||||
* The string should not be freed. It should also not be expected to remain the same between
|
||||
* function calls. */
|
||||
char *
|
||||
FormatEvents(uint32 events)
|
||||
{
|
||||
static char return_str[8];
|
||||
|
||||
/* Helper variable to check if there's extra bits */
|
||||
uint32 all_flags = WL_LATCH_SET
|
||||
| WL_SOCKET_READABLE
|
||||
| WL_SOCKET_WRITEABLE
|
||||
| WL_TIMEOUT
|
||||
| WL_POSTMASTER_DEATH
|
||||
| WL_EXIT_ON_PM_DEATH
|
||||
| WL_SOCKET_CONNECTED;
|
||||
|
||||
/*
|
||||
* The formatting here isn't supposed to be *particularly* useful -- it's
|
||||
* just to give an sense of what events have been triggered without
|
||||
* needing to remember your powers of two.
|
||||
*/
|
||||
|
||||
return_str[0] = (events & WL_LATCH_SET) ? 'L' : '_';
|
||||
return_str[1] = (events & WL_SOCKET_READABLE) ? 'R' : '_';
|
||||
return_str[2] = (events & WL_SOCKET_WRITEABLE) ? 'W' : '_';
|
||||
return_str[3] = (events & WL_TIMEOUT) ? 'T' : '_';
|
||||
return_str[4] = (events & WL_POSTMASTER_DEATH) ? 'D' : '_';
|
||||
return_str[5] = (events & WL_EXIT_ON_PM_DEATH) ? 'E' : '_';
|
||||
return_str[5] = (events & WL_SOCKET_CONNECTED) ? 'C' : '_';
|
||||
|
||||
if (events & (~all_flags))
|
||||
{
|
||||
elog(WARNING, "Event formatting found unexpected component %d",
|
||||
events & (~all_flags));
|
||||
return_str[6] = '*';
|
||||
return_str[7] = '\0';
|
||||
}
|
||||
else
|
||||
return_str[6] = '\0';
|
||||
|
||||
return (char *) &return_str;
|
||||
}
|
||||
|
||||
/*
|
||||
* Convert a character which represents a hexadecimal digit to an integer.
|
||||
*
|
||||
* Returns -1 if the character is not a hexadecimal digit.
|
||||
*/
|
||||
static int
|
||||
HexDecodeChar(char c)
|
||||
{
|
||||
if (c >= '0' && c <= '9')
|
||||
return c - '0';
|
||||
if (c >= 'a' && c <= 'f')
|
||||
return c - 'a' + 10;
|
||||
if (c >= 'A' && c <= 'F')
|
||||
return c - 'A' + 10;
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
/*
|
||||
* Decode a hex string into a byte string, 2 hex chars per byte.
|
||||
*
|
||||
* Returns false if invalid characters are encountered; otherwise true.
|
||||
*/
|
||||
bool
|
||||
HexDecodeString(uint8 *result, char *input, int nbytes)
|
||||
{
|
||||
int i;
|
||||
|
||||
for (i = 0; i < nbytes; ++i)
|
||||
{
|
||||
int n1 = HexDecodeChar(input[i * 2]);
|
||||
int n2 = HexDecodeChar(input[i * 2 + 1]);
|
||||
|
||||
if (n1 < 0 || n2 < 0)
|
||||
return false;
|
||||
result[i] = n1 * 16 + n2;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/* --------------------------------
|
||||
* pq_getmsgint32_le - get a binary 4-byte int from a message buffer in native (LE) order
|
||||
* --------------------------------
|
||||
*/
|
||||
uint32
|
||||
pq_getmsgint32_le(StringInfo msg)
|
||||
{
|
||||
uint32 n32;
|
||||
|
||||
pq_copymsgbytes(msg, (char *) &n32, sizeof(n32));
|
||||
|
||||
return n32;
|
||||
}
|
||||
|
||||
/* --------------------------------
|
||||
* pq_getmsgint64 - get a binary 8-byte int from a message buffer in native (LE) order
|
||||
* --------------------------------
|
||||
*/
|
||||
uint64
|
||||
pq_getmsgint64_le(StringInfo msg)
|
||||
{
|
||||
uint64 n64;
|
||||
|
||||
pq_copymsgbytes(msg, (char *) &n64, sizeof(n64));
|
||||
|
||||
return n64;
|
||||
}
|
||||
|
||||
/* append a binary [u]int32 to a StringInfo buffer in native (LE) order */
|
||||
void
|
||||
pq_sendint32_le(StringInfo buf, uint32 i)
|
||||
{
|
||||
enlargeStringInfo(buf, sizeof(uint32));
|
||||
memcpy(buf->data + buf->len, &i, sizeof(uint32));
|
||||
buf->len += sizeof(uint32);
|
||||
}
|
||||
|
||||
/* append a binary [u]int64 to a StringInfo buffer in native (LE) order */
|
||||
void
|
||||
pq_sendint64_le(StringInfo buf, uint64 i)
|
||||
{
|
||||
enlargeStringInfo(buf, sizeof(uint64));
|
||||
memcpy(buf->data + buf->len, &i, sizeof(uint64));
|
||||
buf->len += sizeof(uint64);
|
||||
}
|
||||
|
||||
/*
|
||||
* Write XLOG data to disk.
|
||||
*/
|
||||
void
|
||||
XLogWalPropWrite(char *buf, Size nbytes, XLogRecPtr recptr)
|
||||
{
|
||||
int startoff;
|
||||
int byteswritten;
|
||||
|
||||
while (nbytes > 0)
|
||||
{
|
||||
int segbytes;
|
||||
|
||||
/* Close the current segment if it's completed */
|
||||
if (walpropFile >= 0 && !XLByteInSeg(recptr, walpropSegNo, wal_segment_size))
|
||||
XLogWalPropClose(recptr);
|
||||
|
||||
if (walpropFile < 0)
|
||||
{
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
/* FIXME Is it ok to use hardcoded value here? */
|
||||
TimeLineID tli = 1;
|
||||
#else
|
||||
bool use_existent = true;
|
||||
#endif
|
||||
/* Create/use new log file */
|
||||
XLByteToSeg(recptr, walpropSegNo, wal_segment_size);
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
walpropFile = XLogFileInit(walpropSegNo, tli);
|
||||
walpropFileTLI = tli;
|
||||
#else
|
||||
walpropFile = XLogFileInit(walpropSegNo, &use_existent, false);
|
||||
walpropFileTLI = ThisTimeLineID;
|
||||
#endif
|
||||
}
|
||||
|
||||
/* Calculate the start offset of the received logs */
|
||||
startoff = XLogSegmentOffset(recptr, wal_segment_size);
|
||||
|
||||
if (startoff + nbytes > wal_segment_size)
|
||||
segbytes = wal_segment_size - startoff;
|
||||
else
|
||||
segbytes = nbytes;
|
||||
|
||||
/* OK to write the logs */
|
||||
errno = 0;
|
||||
|
||||
byteswritten = pg_pwrite(walpropFile, buf, segbytes, (off_t) startoff);
|
||||
if (byteswritten <= 0)
|
||||
{
|
||||
char xlogfname[MAXFNAMELEN];
|
||||
int save_errno;
|
||||
|
||||
/* if write didn't set errno, assume no disk space */
|
||||
if (errno == 0)
|
||||
errno = ENOSPC;
|
||||
|
||||
save_errno = errno;
|
||||
XLogFileName(xlogfname, walpropFileTLI, walpropSegNo, wal_segment_size);
|
||||
errno = save_errno;
|
||||
ereport(PANIC,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not write to log segment %s "
|
||||
"at offset %u, length %lu: %m",
|
||||
xlogfname, startoff, (unsigned long) segbytes)));
|
||||
}
|
||||
|
||||
/* Update state for write */
|
||||
recptr += byteswritten;
|
||||
|
||||
nbytes -= byteswritten;
|
||||
buf += byteswritten;
|
||||
}
|
||||
|
||||
/*
|
||||
* Close the current segment if it's fully written up in the last cycle of
|
||||
* the loop.
|
||||
*/
|
||||
if (walpropFile >= 0 && !XLByteInSeg(recptr, walpropSegNo, wal_segment_size))
|
||||
{
|
||||
XLogWalPropClose(recptr);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Close the current segment.
|
||||
*/
|
||||
void
|
||||
XLogWalPropClose(XLogRecPtr recptr)
|
||||
{
|
||||
Assert(walpropFile >= 0 && !XLByteInSeg(recptr, walpropSegNo, wal_segment_size));
|
||||
|
||||
if (close(walpropFile) != 0)
|
||||
{
|
||||
char xlogfname[MAXFNAMELEN];
|
||||
|
||||
XLogFileName(xlogfname, walpropFileTLI, walpropSegNo, wal_segment_size);
|
||||
|
||||
ereport(PANIC,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not close log segment %s: %m",
|
||||
xlogfname)));
|
||||
}
|
||||
|
||||
walpropFile = -1;
|
||||
}
|
||||
|
||||
/* START of cloned functions from walsender.c */
|
||||
|
||||
/*
|
||||
* Subscribe for new WAL and stream it in the loop to safekeepers.
|
||||
*
|
||||
* At the moment, this never returns, but an ereport(ERROR) will take us back
|
||||
* to the main loop.
|
||||
*/
|
||||
void
|
||||
StartProposerReplication(StartReplicationCmd *cmd)
|
||||
{
|
||||
XLogRecPtr FlushPtr;
|
||||
TimeLineID currTLI;
|
||||
|
||||
#if PG_VERSION_NUM < 150000
|
||||
if (ThisTimeLineID == 0)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||
errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
|
||||
#endif
|
||||
|
||||
/*
|
||||
* We assume here that we're logging enough information in the WAL for
|
||||
* log-shipping, since this is checked in PostmasterMain().
|
||||
*
|
||||
* NOTE: wal_level can only change at shutdown, so in most cases it is
|
||||
* difficult for there to be WAL data that we can still see that was
|
||||
* written at wal_level='minimal'.
|
||||
*/
|
||||
|
||||
if (cmd->slotname)
|
||||
{
|
||||
ReplicationSlotAcquire(cmd->slotname, true);
|
||||
if (SlotIsLogical(MyReplicationSlot))
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||
errmsg("cannot use a logical replication slot for physical replication")));
|
||||
|
||||
/*
|
||||
* We don't need to verify the slot's restart_lsn here; instead we
|
||||
* rely on the caller requesting the starting point to use. If the
|
||||
* WAL segment doesn't exist, we'll fail later.
|
||||
*/
|
||||
}
|
||||
|
||||
/*
|
||||
* Select the timeline. If it was given explicitly by the client, use
|
||||
* that. Otherwise use the timeline of the last replayed record, which is
|
||||
* kept in ThisTimeLineID.
|
||||
*
|
||||
* Neon doesn't currently use PG Timelines, but it may in the future, so
|
||||
* we keep this code around to lighten the load for when we need it.
|
||||
*/
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
FlushPtr = GetFlushRecPtr(&currTLI);
|
||||
#else
|
||||
FlushPtr = GetFlushRecPtr();
|
||||
currTLI = ThisTimeLineID;
|
||||
#endif
|
||||
|
||||
/*
|
||||
* When we first start replication the standby will be behind the
|
||||
* primary. For some applications, for example synchronous
|
||||
* replication, it is important to have a clear state for this initial
|
||||
* catchup mode, so we can trigger actions when we change streaming
|
||||
* state later. We may stay in this state for a long time, which is
|
||||
* exactly why we want to be able to monitor whether or not we are
|
||||
* still here.
|
||||
*/
|
||||
WalSndSetState(WALSNDSTATE_CATCHUP);
|
||||
|
||||
/*
|
||||
* Don't allow a request to stream from a future point in WAL that
|
||||
* hasn't been flushed to disk in this server yet.
|
||||
*/
|
||||
if (FlushPtr < cmd->startpoint)
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
|
||||
LSN_FORMAT_ARGS(cmd->startpoint),
|
||||
LSN_FORMAT_ARGS(FlushPtr))));
|
||||
}
|
||||
|
||||
/* Start streaming from the requested point */
|
||||
sentPtr = cmd->startpoint;
|
||||
|
||||
/* Initialize shared memory status, too */
|
||||
SpinLockAcquire(&MyWalSnd->mutex);
|
||||
MyWalSnd->sentPtr = sentPtr;
|
||||
SpinLockRelease(&MyWalSnd->mutex);
|
||||
|
||||
SyncRepInitConfig();
|
||||
|
||||
/* Infinite send loop, never returns */
|
||||
WalSndLoop();
|
||||
|
||||
WalSndSetState(WALSNDSTATE_STARTUP);
|
||||
|
||||
if (cmd->slotname)
|
||||
ReplicationSlotRelease();
|
||||
}
|
||||
|
||||
/*
|
||||
* Main loop that waits for LSN updates and calls the walproposer.
|
||||
* Synchronous replication sets latch in WalSndWakeup at walsender.c
|
||||
*/
|
||||
static void
|
||||
WalSndLoop(void)
|
||||
{
|
||||
/* Clear any already-pending wakeups */
|
||||
ResetLatch(MyLatch);
|
||||
|
||||
for (;;)
|
||||
{
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
XLogBroadcastWalProposer();
|
||||
|
||||
if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
|
||||
WalSndSetState(WALSNDSTATE_STREAMING);
|
||||
WalProposerPoll();
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Notify walproposer about the new WAL position.
|
||||
*/
|
||||
static void
|
||||
XLogBroadcastWalProposer(void)
|
||||
{
|
||||
XLogRecPtr startptr;
|
||||
XLogRecPtr endptr;
|
||||
|
||||
/* Start from the last sent position */
|
||||
startptr = sentPtr;
|
||||
|
||||
/*
|
||||
* Streaming the current timeline on a primary.
|
||||
*
|
||||
* Attempt to send all data that's already been written out and
|
||||
* fsync'd to disk. We cannot go further than what's been written out
|
||||
* given the current implementation of WALRead(). And in any case
|
||||
* it's unsafe to send WAL that is not securely down to disk on the
|
||||
* primary: if the primary subsequently crashes and restarts, standbys
|
||||
* must not have applied any WAL that got lost on the primary.
|
||||
*/
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
endptr = GetFlushRecPtr(NULL);
|
||||
#else
|
||||
endptr = GetFlushRecPtr();
|
||||
#endif
|
||||
|
||||
/*
|
||||
* Record the current system time as an approximation of the time at which
|
||||
* this WAL location was written for the purposes of lag tracking.
|
||||
*
|
||||
* In theory we could make XLogFlush() record a time in shmem whenever WAL
|
||||
* is flushed and we could get that time as well as the LSN when we call
|
||||
* GetFlushRecPtr() above (and likewise for the cascading standby
|
||||
* equivalent), but rather than putting any new code into the hot WAL path
|
||||
* it seems good enough to capture the time here. We should reach this
|
||||
* after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
|
||||
* may take some time, we read the WAL flush pointer and take the time
|
||||
* very close to together here so that we'll get a later position if it is
|
||||
* still moving.
|
||||
*
|
||||
* Because LagTrackerWrite ignores samples when the LSN hasn't advanced,
|
||||
* this gives us a cheap approximation for the WAL flush time for this
|
||||
* LSN.
|
||||
*
|
||||
* Note that the LSN is not necessarily the LSN for the data contained in
|
||||
* the present message; it's the end of the WAL, which might be further
|
||||
* ahead. All the lag tracking machinery cares about is finding out when
|
||||
* that arbitrary LSN is eventually reported as written, flushed and
|
||||
* applied, so that it can measure the elapsed time.
|
||||
*/
|
||||
LagTrackerWrite(endptr, GetCurrentTimestamp());
|
||||
|
||||
/* Do we have any work to do? */
|
||||
Assert(startptr <= endptr);
|
||||
if (endptr <= startptr)
|
||||
return;
|
||||
|
||||
WalProposerBroadcast(startptr, endptr);
|
||||
sentPtr = endptr;
|
||||
|
||||
/* Update shared memory status */
|
||||
{
|
||||
WalSnd *walsnd = MyWalSnd;
|
||||
|
||||
SpinLockAcquire(&walsnd->mutex);
|
||||
walsnd->sentPtr = sentPtr;
|
||||
SpinLockRelease(&walsnd->mutex);
|
||||
}
|
||||
|
||||
/* Report progress of XLOG streaming in PS display */
|
||||
if (update_process_title)
|
||||
{
|
||||
char activitymsg[50];
|
||||
|
||||
snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
|
||||
LSN_FORMAT_ARGS(sentPtr));
|
||||
set_ps_display(activitymsg);
|
||||
}
|
||||
}
|
||||
@@ -1,19 +0,0 @@
|
||||
#ifndef __NEON_WALPROPOSER_UTILS_H__
|
||||
#define __NEON_WALPROPOSER_UTILS_H__
|
||||
|
||||
#include "walproposer.h"
|
||||
|
||||
int CompareLsn(const void *a, const void *b);
|
||||
char *FormatSafekeeperState(SafekeeperState state);
|
||||
void AssertEventsOkForState(uint32 events, Safekeeper *sk);
|
||||
uint32 SafekeeperStateDesiredEvents(SafekeeperState state);
|
||||
char *FormatEvents(uint32 events);
|
||||
bool HexDecodeString(uint8 *result, char *input, int nbytes);
|
||||
uint32 pq_getmsgint32_le(StringInfo msg);
|
||||
uint64 pq_getmsgint64_le(StringInfo msg);
|
||||
void pq_sendint32_le(StringInfo buf, uint32 i);
|
||||
void pq_sendint64_le(StringInfo buf, uint64 i);
|
||||
void XLogWalPropWrite(char *buf, Size nbytes, XLogRecPtr recptr);
|
||||
void XLogWalPropClose(XLogRecPtr recptr);
|
||||
|
||||
#endif /* __NEON_WALPROPOSER_UTILS_H__ */
|
||||
@@ -89,7 +89,10 @@ pub mod errors {
|
||||
Self::Console {
|
||||
status: http::StatusCode::LOCKED,
|
||||
ref text,
|
||||
} => !text.contains("quota"),
|
||||
} => {
|
||||
!text.contains("written data quota exceeded")
|
||||
&& !text.contains("the limit for current plan reached")
|
||||
}
|
||||
// retry server errors
|
||||
Self::Console { status, .. } if status.is_server_error() => true,
|
||||
_ => false,
|
||||
|
||||
@@ -723,9 +723,9 @@ impl Timeline {
|
||||
if horizon_segno <= 1 || horizon_segno <= shared_state.last_removed_segno {
|
||||
return Ok(()); // nothing to do
|
||||
}
|
||||
let remover = shared_state.sk.wal_store.remove_up_to(horizon_segno - 1);
|
||||
|
||||
// release the lock before removing
|
||||
remover
|
||||
shared_state.sk.wal_store.remove_up_to(horizon_segno - 1)
|
||||
};
|
||||
|
||||
// delete old WAL files
|
||||
|
||||
@@ -1085,15 +1085,32 @@ class AbstractNeonCli(abc.ABC):
|
||||
stderr=subprocess.PIPE,
|
||||
timeout=timeout,
|
||||
)
|
||||
|
||||
indent = " "
|
||||
if not res.returncode:
|
||||
log.info(f"Run {res.args} success: {res.stdout}")
|
||||
stripped = res.stdout.strip()
|
||||
lines = stripped.splitlines()
|
||||
if len(lines) < 2:
|
||||
log.debug(f"Run {res.args} success: {stripped}")
|
||||
else:
|
||||
log.debug("Run %s success:\n%s" % (res.args, textwrap.indent(stripped, indent)))
|
||||
elif check_return_code:
|
||||
# this way command output will be in recorded and shown in CI in failure message
|
||||
msg = f"""\
|
||||
Run {res.args} failed:
|
||||
stdout: {res.stdout}
|
||||
stderr: {res.stderr}
|
||||
indent = indent * 2
|
||||
msg = textwrap.dedent(
|
||||
"""\
|
||||
Run %s failed:
|
||||
stdout:
|
||||
%s
|
||||
stderr:
|
||||
%s
|
||||
"""
|
||||
)
|
||||
msg = msg % (
|
||||
res.args,
|
||||
textwrap.indent(res.stdout.strip(), indent),
|
||||
textwrap.indent(res.stderr.strip(), indent),
|
||||
)
|
||||
log.info(msg)
|
||||
raise RuntimeError(msg) from subprocess.CalledProcessError(
|
||||
res.returncode, res.args, res.stdout, res.stderr
|
||||
@@ -1447,6 +1464,29 @@ class NeonCli(AbstractNeonCli):
|
||||
|
||||
return self.raw_cli(args, check_return_code=check_return_code)
|
||||
|
||||
def map_branch(
|
||||
self, name: str, tenant_id: TenantId, timeline_id: TimelineId
|
||||
) -> "subprocess.CompletedProcess[str]":
|
||||
"""
|
||||
Map tenant id and timeline id to a neon_local branch name. They do not have to exist.
|
||||
Usually needed when creating branches via PageserverHttpClient and not neon_local.
|
||||
|
||||
After creating a name mapping, you can use EndpointFactory.create_start
|
||||
with this registered branch name.
|
||||
"""
|
||||
args = [
|
||||
"mappings",
|
||||
"map",
|
||||
"--branch-name",
|
||||
name,
|
||||
"--tenant-id",
|
||||
str(tenant_id),
|
||||
"--timeline-id",
|
||||
str(timeline_id),
|
||||
]
|
||||
|
||||
return self.raw_cli(args, check_return_code=True)
|
||||
|
||||
def start(self, check_return_code=True) -> "subprocess.CompletedProcess[str]":
|
||||
return self.raw_cli(["start"], check_return_code=check_return_code)
|
||||
|
||||
|
||||
@@ -74,11 +74,14 @@ def wait_until_tenant_state(
|
||||
for _ in range(iterations):
|
||||
try:
|
||||
tenant = pageserver_http.tenant_status(tenant_id=tenant_id)
|
||||
except Exception as e:
|
||||
log.debug(f"Tenant {tenant_id} state retrieval failure: {e}")
|
||||
else:
|
||||
log.debug(f"Tenant {tenant_id} data: {tenant}")
|
||||
if tenant["state"]["slug"] == expected_state:
|
||||
return tenant
|
||||
except Exception as e:
|
||||
log.debug(f"Tenant {tenant_id} state retrieval failure: {e}")
|
||||
if tenant["state"]["slug"] == "Broken":
|
||||
raise RuntimeError(f"tenant became Broken, not {expected_state}")
|
||||
|
||||
time.sleep(period)
|
||||
|
||||
|
||||
@@ -1,14 +1,24 @@
|
||||
import random
|
||||
import threading
|
||||
import time
|
||||
from typing import List
|
||||
from queue import SimpleQueue
|
||||
from typing import Any, Dict, List, Union
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import Endpoint, NeonEnv, PgBin
|
||||
from fixtures.types import Lsn
|
||||
from fixtures.neon_fixtures import (
|
||||
Endpoint,
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
PgBin,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverApiException
|
||||
from fixtures.pageserver.utils import wait_until_tenant_active
|
||||
from fixtures.types import Lsn, TimelineId
|
||||
from fixtures.utils import query_scalar
|
||||
from performance.test_perf_pgbench import get_scales_matrix
|
||||
from requests import RequestException
|
||||
from requests.exceptions import RetryError
|
||||
|
||||
|
||||
# Test branch creation
|
||||
@@ -128,3 +138,245 @@ def test_branching_unnormalized_start_lsn(neon_simple_env: NeonEnv, pg_bin: PgBi
|
||||
endpoint1 = env.endpoints.create_start("b1")
|
||||
|
||||
pg_bin.run_capture(["pgbench", "-i", endpoint1.connstr()])
|
||||
|
||||
|
||||
def test_cannot_create_endpoint_on_non_uploaded_timeline(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Endpoint should not be possible to create because branch has not been uploaded.
|
||||
"""
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*"
|
||||
)
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*page_service_conn_main.*: query handler for 'basebackup .* is not active, state: Loading"
|
||||
)
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
# pause all uploads
|
||||
ps_http.configure_failpoints(("before-upload-index-pausable", "pause"))
|
||||
ps_http.tenant_create(env.initial_tenant)
|
||||
|
||||
initial_branch = "initial_branch"
|
||||
|
||||
def start_creating_timeline():
|
||||
with pytest.raises(RequestException):
|
||||
ps_http.timeline_create(
|
||||
env.pg_version, env.initial_tenant, env.initial_timeline, timeout=60
|
||||
)
|
||||
|
||||
t = threading.Thread(target=start_creating_timeline)
|
||||
try:
|
||||
t.start()
|
||||
|
||||
wait_until_paused(env, "before-upload-index-pausable")
|
||||
|
||||
env.neon_cli.map_branch(initial_branch, env.initial_tenant, env.initial_timeline)
|
||||
|
||||
with pytest.raises(RuntimeError, match="is not active, state: Loading"):
|
||||
env.endpoints.create_start(initial_branch, tenant_id=env.initial_tenant)
|
||||
finally:
|
||||
# FIXME: paused uploads bother shutdown
|
||||
env.pageserver.stop(immediate=True)
|
||||
|
||||
t.join()
|
||||
|
||||
|
||||
def test_cannot_branch_from_non_uploaded_branch(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Branch should not be possible to create because ancestor has not been uploaded.
|
||||
"""
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*"
|
||||
)
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
# pause all uploads
|
||||
ps_http.configure_failpoints(("before-upload-index-pausable", "pause"))
|
||||
ps_http.tenant_create(env.initial_tenant)
|
||||
|
||||
def start_creating_timeline():
|
||||
with pytest.raises(RequestException):
|
||||
ps_http.timeline_create(
|
||||
env.pg_version, env.initial_tenant, env.initial_timeline, timeout=60
|
||||
)
|
||||
|
||||
t = threading.Thread(target=start_creating_timeline)
|
||||
try:
|
||||
t.start()
|
||||
|
||||
wait_until_paused(env, "before-upload-index-pausable")
|
||||
|
||||
branch_id = TimelineId.generate()
|
||||
|
||||
with pytest.raises(RetryError, match="too many 503 error responses"):
|
||||
ps_http.timeline_create(
|
||||
env.pg_version,
|
||||
env.initial_tenant,
|
||||
branch_id,
|
||||
ancestor_timeline_id=env.initial_timeline,
|
||||
)
|
||||
|
||||
with pytest.raises(
|
||||
PageserverApiException,
|
||||
match=f"NotFound: Timeline {env.initial_tenant}/{branch_id} was not found",
|
||||
):
|
||||
ps_http.timeline_detail(env.initial_tenant, branch_id)
|
||||
# important to note that a task might still be in progress to complete
|
||||
# the work, but will never get to that because we have the pause
|
||||
# failpoint
|
||||
finally:
|
||||
# FIXME: paused uploads bother shutdown
|
||||
env.pageserver.stop(immediate=True)
|
||||
|
||||
t.join()
|
||||
|
||||
|
||||
def test_competing_branchings_from_loading_race_to_ok_or_err(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
If the activate only after upload is used, then retries could become competing.
|
||||
"""
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*"
|
||||
)
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*Error processing HTTP request: InternalServerError\\(Timeline .*/.* already exists in pageserver's memory"
|
||||
)
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
# pause all uploads
|
||||
ps_http.configure_failpoints(("before-upload-index-pausable", "pause"))
|
||||
ps_http.tenant_create(env.initial_tenant)
|
||||
|
||||
def start_creating_timeline():
|
||||
ps_http.timeline_create(
|
||||
env.pg_version, env.initial_tenant, env.initial_timeline, timeout=60
|
||||
)
|
||||
|
||||
create_root = threading.Thread(target=start_creating_timeline)
|
||||
|
||||
branch_id = TimelineId.generate()
|
||||
|
||||
queue: SimpleQueue[Union[Dict[Any, Any], Exception]] = SimpleQueue()
|
||||
barrier = threading.Barrier(3)
|
||||
|
||||
def try_branch():
|
||||
barrier.wait()
|
||||
barrier.wait()
|
||||
try:
|
||||
ret = ps_http.timeline_create(
|
||||
env.pg_version,
|
||||
env.initial_tenant,
|
||||
branch_id,
|
||||
ancestor_timeline_id=env.initial_timeline,
|
||||
timeout=5,
|
||||
)
|
||||
queue.put(ret)
|
||||
except Exception as e:
|
||||
queue.put(e)
|
||||
|
||||
threads = [threading.Thread(target=try_branch) for _ in range(2)]
|
||||
|
||||
try:
|
||||
create_root.start()
|
||||
|
||||
for t in threads:
|
||||
t.start()
|
||||
|
||||
wait_until_paused(env, "before-upload-index-pausable")
|
||||
|
||||
barrier.wait()
|
||||
ps_http.configure_failpoints(("before-upload-index-pausable", "off"))
|
||||
barrier.wait()
|
||||
|
||||
# now both requests race to branch, only one can win because they take gc_cs, Tenant::timelines or marker files
|
||||
first = queue.get()
|
||||
second = queue.get()
|
||||
|
||||
log.info(first)
|
||||
log.info(second)
|
||||
|
||||
(succeeded, failed) = (first, second) if isinstance(second, Exception) else (second, first)
|
||||
assert isinstance(failed, Exception)
|
||||
assert isinstance(succeeded, Dict)
|
||||
|
||||
# FIXME: there's probably multiple valid status codes:
|
||||
# - Timeline 62505b9a9f6b1d29117b1b74eaf07b12/56cd19d3b2dbcc65e9d53ec6ca304f24 already exists
|
||||
# - whatever 409 response says, but that is a subclass of PageserverApiException
|
||||
assert isinstance(failed, PageserverApiException)
|
||||
assert succeeded["state"] == "Active"
|
||||
finally:
|
||||
# we might still have the failpoint active
|
||||
env.pageserver.stop(immediate=True)
|
||||
|
||||
# pytest should nag if we leave threads unjoined
|
||||
for t in threads:
|
||||
t.join()
|
||||
create_root.join()
|
||||
|
||||
|
||||
def test_non_uploaded_branch_availability_after_restart(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Currently before RFC#27 we keep and continue uploading branches which were not successfully uploaded before shutdown.
|
||||
|
||||
This test likely duplicates some other test, but it's easier to write one than to make sure there will be a failing test when the rfc is implemented.
|
||||
"""
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*"
|
||||
)
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
# pause all uploads
|
||||
ps_http.configure_failpoints(("before-upload-index-pausable", "pause"))
|
||||
ps_http.tenant_create(env.initial_tenant)
|
||||
|
||||
def start_creating_timeline():
|
||||
with pytest.raises(RequestException):
|
||||
ps_http.timeline_create(
|
||||
env.pg_version, env.initial_tenant, env.initial_timeline, timeout=60
|
||||
)
|
||||
|
||||
t = threading.Thread(target=start_creating_timeline)
|
||||
try:
|
||||
t.start()
|
||||
|
||||
wait_until_paused(env, "before-upload-index-pausable")
|
||||
finally:
|
||||
# FIXME: paused uploads bother shutdown
|
||||
env.pageserver.stop(immediate=True)
|
||||
t.join()
|
||||
|
||||
# now without a failpoint
|
||||
env.pageserver.start()
|
||||
|
||||
wait_until_tenant_active(ps_http, env.initial_tenant)
|
||||
|
||||
# currently it lives on and will get eventually uploaded, but this will change
|
||||
detail = ps_http.timeline_detail(env.initial_tenant, env.initial_timeline)
|
||||
assert detail["state"] == "Active"
|
||||
|
||||
|
||||
def wait_until_paused(env: NeonEnv, failpoint: str):
|
||||
found = False
|
||||
msg = f"at failpoint {failpoint}"
|
||||
for _ in range(20):
|
||||
time.sleep(1)
|
||||
found = env.pageserver.log_contains(msg) is not None
|
||||
if found:
|
||||
break
|
||||
assert found
|
||||
|
||||
@@ -3,7 +3,10 @@ import time
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, wait_for_last_flush_lsn
|
||||
from fixtures.pageserver.utils import wait_for_upload_queue_empty
|
||||
from fixtures.pageserver.utils import (
|
||||
wait_for_upload_queue_empty,
|
||||
wait_until_tenant_active,
|
||||
)
|
||||
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind
|
||||
from requests.exceptions import ConnectionError
|
||||
|
||||
@@ -113,6 +116,8 @@ def test_actually_duplicated_l1(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin)
|
||||
time.sleep(1)
|
||||
|
||||
env.pageserver.start()
|
||||
wait_until_tenant_active(pageserver_http, tenant_id)
|
||||
|
||||
message = f".*duplicated L1 layer layer={l1_found.name}"
|
||||
env.pageserver.allowed_errors.append(message)
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ of the pageserver are:
|
||||
"""
|
||||
|
||||
|
||||
import enum
|
||||
import re
|
||||
import time
|
||||
from typing import Optional
|
||||
@@ -81,7 +82,7 @@ def generate_uploads_and_deletions(
|
||||
f"""
|
||||
INSERT INTO foo (id, val)
|
||||
SELECT g, '{data}'
|
||||
FROM generate_series(1, 20000) g
|
||||
FROM generate_series(1, 200) g
|
||||
ON CONFLICT (id) DO UPDATE
|
||||
SET val = EXCLUDED.val
|
||||
""",
|
||||
@@ -116,6 +117,10 @@ def get_deletion_queue_submitted(ps_http) -> int:
|
||||
return get_metric_or_0(ps_http, "pageserver_deletion_queue_submitted_total")
|
||||
|
||||
|
||||
def get_deletion_queue_validated(ps_http) -> int:
|
||||
return get_metric_or_0(ps_http, "pageserver_deletion_queue_validated_total")
|
||||
|
||||
|
||||
def get_deletion_queue_dropped(ps_http) -> int:
|
||||
return get_metric_or_0(ps_http, "pageserver_deletion_queue_dropped_total")
|
||||
|
||||
@@ -272,13 +277,29 @@ def test_deferred_deletion(neon_env_builder: NeonEnvBuilder):
|
||||
assert get_deletion_queue_unexpected_errors(ps_http) == 0
|
||||
|
||||
|
||||
@pytest.mark.parametrize("keep_attachment", [True, False])
|
||||
class KeepAttachment(str, enum.Enum):
|
||||
KEEP = "keep"
|
||||
LOSE = "lose"
|
||||
|
||||
|
||||
class ValidateBefore(str, enum.Enum):
|
||||
VALIDATE = "validate"
|
||||
NO_VALIDATE = "no-validate"
|
||||
|
||||
|
||||
@pytest.mark.parametrize("keep_attachment", [KeepAttachment.KEEP, KeepAttachment.LOSE])
|
||||
@pytest.mark.parametrize("validate_before", [ValidateBefore.VALIDATE, ValidateBefore.NO_VALIDATE])
|
||||
def test_deletion_queue_recovery(
|
||||
neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, keep_attachment: bool
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
pg_bin: PgBin,
|
||||
keep_attachment: KeepAttachment,
|
||||
validate_before: ValidateBefore,
|
||||
):
|
||||
"""
|
||||
:param keep_attachment: If true, we re-attach after restart. Else, we act as if some other
|
||||
:param keep_attachment: whether to re-attach after restart. Else, we act as if some other
|
||||
node took the attachment while we were restarting.
|
||||
:param validate_before: whether to wait for deletions to be validated before restart. This
|
||||
makes them elegible to be executed after restart, if the same node keeps the attachment.
|
||||
"""
|
||||
neon_env_builder.enable_generations = True
|
||||
neon_env_builder.enable_pageserver_remote_storage(
|
||||
@@ -288,12 +309,20 @@ def test_deletion_queue_recovery(
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
# Prevent deletion lists from being executed, to build up some backlog of deletions
|
||||
ps_http.configure_failpoints(
|
||||
[
|
||||
("deletion-queue-before-execute", "return"),
|
||||
]
|
||||
)
|
||||
failpoints = [
|
||||
# Prevent deletion lists from being executed, to build up some backlog of deletions
|
||||
("deletion-queue-before-execute", "return"),
|
||||
]
|
||||
|
||||
if validate_before == ValidateBefore.NO_VALIDATE:
|
||||
failpoints.append(
|
||||
# Prevent deletion lists from being validated, we will test that they are
|
||||
# dropped properly during recovery. 'pause' is okay here because we kill
|
||||
# the pageserver with immediate=true
|
||||
("control-plane-client-validate", "pause")
|
||||
)
|
||||
|
||||
ps_http.configure_failpoints(failpoints)
|
||||
|
||||
generate_uploads_and_deletions(env)
|
||||
|
||||
@@ -305,10 +334,25 @@ def test_deletion_queue_recovery(
|
||||
assert get_deletion_queue_unexpected_errors(ps_http) == 0
|
||||
assert get_deletion_queue_dropped_lsn_updates(ps_http) == 0
|
||||
|
||||
if validate_before == ValidateBefore.VALIDATE:
|
||||
|
||||
def assert_validation_complete():
|
||||
assert get_deletion_queue_submitted(ps_http) == get_deletion_queue_validated(ps_http)
|
||||
|
||||
wait_until(20, 1, assert_validation_complete)
|
||||
|
||||
# The validatated keys statistic advances before the header is written, so we
|
||||
# also wait to see the header hit the disk: this seems paranoid but the race
|
||||
# can really happen on a heavily overloaded test machine.
|
||||
def assert_header_written():
|
||||
assert (env.pageserver.workdir / "deletion" / "header-01").exists()
|
||||
|
||||
wait_until(20, 1, assert_header_written)
|
||||
|
||||
log.info(f"Restarting pageserver with {before_restart_depth} deletions enqueued")
|
||||
env.pageserver.stop(immediate=True)
|
||||
|
||||
if not keep_attachment:
|
||||
if keep_attachment == KeepAttachment.LOSE:
|
||||
some_other_pageserver = 101010
|
||||
assert env.attachment_service is not None
|
||||
env.attachment_service.attach_hook(env.initial_tenant, some_other_pageserver)
|
||||
@@ -327,14 +371,17 @@ def test_deletion_queue_recovery(
|
||||
ps_http.deletion_queue_flush(execute=True)
|
||||
wait_until(10, 1, lambda: assert_deletion_queue(ps_http, lambda n: n == 0))
|
||||
|
||||
if keep_attachment:
|
||||
# If we kept the attachment, then our pre-restart deletions should have executed
|
||||
# successfully
|
||||
if keep_attachment == KeepAttachment.KEEP or validate_before == ValidateBefore.VALIDATE:
|
||||
# - If we kept the attachment, then our pre-restart deletions should execute
|
||||
# because on re-attach they were from the immediately preceding generation
|
||||
# - If we validated before restart, then the deletions should execute because the
|
||||
# deletion queue header records a validated deletion list sequence number.
|
||||
assert get_deletion_queue_executed(ps_http) == before_restart_depth
|
||||
else:
|
||||
env.pageserver.allowed_errors.extend([".*Dropping stale deletions.*"])
|
||||
|
||||
# If we lost the attachment, we should have dropped our pre-restart deletions.
|
||||
assert get_deletion_queue_dropped(ps_http) == before_restart_depth
|
||||
env.pageserver.allowed_errors.extend([".*Dropping stale deletions.*"])
|
||||
|
||||
assert get_deletion_queue_unexpected_errors(ps_http) == 0
|
||||
assert get_deletion_queue_dropped_lsn_updates(ps_http) == 0
|
||||
@@ -350,3 +397,73 @@ def test_deletion_queue_recovery(
|
||||
|
||||
assert get_deletion_queue_unexpected_errors(ps_http) == 0
|
||||
assert get_deletion_queue_dropped_lsn_updates(ps_http) == 0
|
||||
|
||||
|
||||
def test_emergency_mode(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
neon_env_builder.enable_generations = True
|
||||
neon_env_builder.enable_pageserver_remote_storage(
|
||||
RemoteStorageKind.MOCK_S3,
|
||||
)
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
generate_uploads_and_deletions(env)
|
||||
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
# When the pageserver can't reach the control plane, it will complain
|
||||
".*calling control plane generation validation API failed.*",
|
||||
# Emergency mode is a big deal, we log errors whenever it is used.
|
||||
".*Emergency mode!.*",
|
||||
]
|
||||
)
|
||||
|
||||
# Simulate a major incident: the control plane goes offline
|
||||
assert env.attachment_service is not None
|
||||
env.attachment_service.stop()
|
||||
|
||||
# Remember how many validations had happened before the control plane went offline
|
||||
validated = get_deletion_queue_validated(ps_http)
|
||||
|
||||
generate_uploads_and_deletions(env, init=False)
|
||||
|
||||
# The running pageserver should stop progressing deletions
|
||||
time.sleep(10)
|
||||
assert get_deletion_queue_validated(ps_http) == validated
|
||||
|
||||
# Restart the pageserver: ordinarily we would _avoid_ doing this during such an
|
||||
# incident, but it might be unavoidable: if so, we want to be able to start up
|
||||
# and serve clients.
|
||||
env.pageserver.stop() # Non-immediate: implicitly checking that shutdown doesn't hang waiting for CP
|
||||
env.pageserver.start(
|
||||
overrides=("--pageserver-config-override=control_plane_emergency_mode=true",)
|
||||
)
|
||||
|
||||
# The pageserver should provide service to clients
|
||||
generate_uploads_and_deletions(env, init=False)
|
||||
|
||||
# The pageserver should neither validate nor execute any deletions, it should have
|
||||
# loaded the DeletionLists from before though
|
||||
time.sleep(10)
|
||||
assert get_deletion_queue_depth(ps_http) > 0
|
||||
assert get_deletion_queue_validated(ps_http) == 0
|
||||
assert get_deletion_queue_executed(ps_http) == 0
|
||||
|
||||
# When the control plane comes back up, normal service should resume
|
||||
env.attachment_service.start()
|
||||
|
||||
ps_http.deletion_queue_flush(execute=True)
|
||||
assert get_deletion_queue_depth(ps_http) == 0
|
||||
assert get_deletion_queue_validated(ps_http) > 0
|
||||
assert get_deletion_queue_executed(ps_http) > 0
|
||||
|
||||
# The pageserver should work fine when subsequently restarted in non-emergency mode
|
||||
env.pageserver.stop() # Non-immediate: implicitly checking that shutdown doesn't hang waiting for CP
|
||||
env.pageserver.start()
|
||||
|
||||
generate_uploads_and_deletions(env, init=False)
|
||||
ps_http.deletion_queue_flush(execute=True)
|
||||
assert get_deletion_queue_depth(ps_http) == 0
|
||||
assert get_deletion_queue_validated(ps_http) > 0
|
||||
assert get_deletion_queue_executed(ps_http) > 0
|
||||
|
||||
@@ -17,6 +17,8 @@ def test_pageserver_restarts_under_worload(neon_simple_env: NeonEnv, pg_bin: PgB
|
||||
n_restarts = 10
|
||||
scale = 10
|
||||
|
||||
env.pageserver.allowed_errors.append(".*query handler.*failed.*Shutting down")
|
||||
|
||||
def run_pgbench(connstr: str):
|
||||
log.info(f"Start a pgbench workload on pg {connstr}")
|
||||
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr])
|
||||
|
||||
@@ -45,14 +45,11 @@ def test_tenant_delete_smoke(
|
||||
[
|
||||
# The deletion queue will complain when it encounters simulated S3 errors
|
||||
".*deletion executor: DeleteObjects request failed.*",
|
||||
# lucky race with stopping from flushing a layer we fail to schedule any uploads
|
||||
".*layer flush task.+: could not flush frozen layer: update_metadata_file",
|
||||
]
|
||||
)
|
||||
|
||||
# lucky race with stopping from flushing a layer we fail to schedule any uploads
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*layer flush task.+: could not flush frozen layer: update_metadata_file"
|
||||
)
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
# first try to delete non existing tenant
|
||||
@@ -194,11 +191,9 @@ def test_delete_tenant_exercise_crash_safety_failpoints(
|
||||
)
|
||||
|
||||
if simulate_failures:
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
# The deletion queue will complain when it encounters simulated S3 errors
|
||||
".*deletion executor: DeleteObjects request failed.*",
|
||||
]
|
||||
env.pageserver.allowed_errors.append(
|
||||
# The deletion queue will complain when it encounters simulated S3 errors
|
||||
".*deletion executor: DeleteObjects request failed.*",
|
||||
)
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
@@ -293,6 +288,10 @@ def test_tenant_delete_is_resumed_on_attach(
|
||||
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
|
||||
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=MANY_SMALL_LAYERS_TENANT_CONFIG)
|
||||
env.pageserver.allowed_errors.append(
|
||||
# lucky race with stopping from flushing a layer we fail to schedule any uploads
|
||||
".*layer flush task.+: could not flush frozen layer: update_metadata_file"
|
||||
)
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
|
||||
|
||||
@@ -752,6 +752,9 @@ def test_ignore_while_attaching(
|
||||
env.pageserver.allowed_errors.append(
|
||||
f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*"
|
||||
)
|
||||
# An endpoint is starting up concurrently with our detach, it can
|
||||
# experience RPC failure due to shutdown.
|
||||
env.pageserver.allowed_errors.append(".*query handler.*failed.*Shutting down")
|
||||
|
||||
data_id = 1
|
||||
data_secret = "very secret secret"
|
||||
|
||||
Reference in New Issue
Block a user