mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-18 13:40:37 +00:00
Compare commits
8 Commits
yuchen/lsn
...
wp-mref
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
73a525de62 | ||
|
|
62c0c1d797 | ||
|
|
4feb6ba29c | ||
|
|
29a41fc7b9 | ||
|
|
d8b2a49c55 | ||
|
|
ed9ffb9af2 | ||
|
|
6c6a7f9ace | ||
|
|
e729f28205 |
6
.github/workflows/build_and_test.yml
vendored
6
.github/workflows/build_and_test.yml
vendored
@@ -299,21 +299,21 @@ jobs:
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: pg_install/v14
|
||||
key: v1-${{ runner.os }}-${{ matrix.build_type }}-pg-${{ steps.pg_v14_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
key: v1-${{ runner.os }}-${{ matrix.build_type }}-pg-${{ steps.pg_v14_rev.outputs.pg_rev }}-${{ hashFiles('Makefile', 'Dockerfile.build-tools') }}
|
||||
|
||||
- name: Cache postgres v15 build
|
||||
id: cache_pg_15
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: pg_install/v15
|
||||
key: v1-${{ runner.os }}-${{ matrix.build_type }}-pg-${{ steps.pg_v15_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
key: v1-${{ runner.os }}-${{ matrix.build_type }}-pg-${{ steps.pg_v15_rev.outputs.pg_rev }}-${{ hashFiles('Makefile', 'Dockerfile.build-tools') }}
|
||||
|
||||
- name: Cache postgres v16 build
|
||||
id: cache_pg_16
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: pg_install/v16
|
||||
key: v1-${{ runner.os }}-${{ matrix.build_type }}-pg-${{ steps.pg_v16_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
key: v1-${{ runner.os }}-${{ matrix.build_type }}-pg-${{ steps.pg_v16_rev.outputs.pg_rev }}-${{ hashFiles('Makefile', 'Dockerfile.build-tools') }}
|
||||
|
||||
- name: Build postgres v14
|
||||
if: steps.cache_pg_14.outputs.cache-hit != 'true'
|
||||
|
||||
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -5158,6 +5158,7 @@ dependencies = [
|
||||
"tokio-io-timeout",
|
||||
"tokio-postgres",
|
||||
"tokio-stream",
|
||||
"tokio-tar",
|
||||
"tokio-util",
|
||||
"toml_edit",
|
||||
"tracing",
|
||||
|
||||
@@ -69,8 +69,6 @@ RUN set -e \
|
||||
&& apt install -y \
|
||||
libreadline-dev \
|
||||
libseccomp-dev \
|
||||
libicu67 \
|
||||
openssl \
|
||||
ca-certificates \
|
||||
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* \
|
||||
&& useradd -d /data neon \
|
||||
|
||||
@@ -112,6 +112,45 @@ RUN for package in Capture::Tiny DateTime Devel::Cover Digest::MD5 File::Spec JS
|
||||
&& make install \
|
||||
&& rm -rf ../lcov.tar.gz
|
||||
|
||||
# Compile and install the static OpenSSL library
|
||||
ENV OPENSSL_VERSION=3.2.2
|
||||
ENV OPENSSL_PREFIX=/usr/local/openssl
|
||||
RUN wget -O /tmp/openssl-${OPENSSL_VERSION}.tar.gz https://www.openssl.org/source/openssl-${OPENSSL_VERSION}.tar.gz && \
|
||||
echo "197149c18d9e9f292c43f0400acaba12e5f52cacfe050f3d199277ea738ec2e7 /tmp/openssl-${OPENSSL_VERSION}.tar.gz" | sha256sum --check && \
|
||||
cd /tmp && \
|
||||
tar xzvf /tmp/openssl-${OPENSSL_VERSION}.tar.gz && \
|
||||
rm /tmp/openssl-${OPENSSL_VERSION}.tar.gz && \
|
||||
cd /tmp/openssl-${OPENSSL_VERSION} && \
|
||||
./config --prefix=${OPENSSL_PREFIX} -static --static no-shared -fPIC && \
|
||||
make -j "$(nproc)" && \
|
||||
make install && \
|
||||
cd /tmp && \
|
||||
rm -rf /tmp/openssl-${OPENSSL_VERSION}
|
||||
|
||||
# Use the same version of libicu as the compute nodes so that
|
||||
# clusters created using inidb on pageserver can be used by computes.
|
||||
#
|
||||
# TODO: at this time, Dockerfile.compute-node uses the debian bullseye libicu
|
||||
# package, which is 67.1. We're duplicating that knowledge here, and also, technically,
|
||||
# Debian has a few patches on top of 67.1 that we're not adding here.
|
||||
ENV ICU_VERSION=67.1
|
||||
ENV ICU_PREFIX=/usr/local/icu
|
||||
|
||||
# Download and build static ICU
|
||||
RUN wget -O /tmp/libicu-${ICU_VERSION}.tgz https://github.com/unicode-org/icu/releases/download/release-${ICU_VERSION//./-}/icu4c-${ICU_VERSION//./_}-src.tgz && \
|
||||
echo "94a80cd6f251a53bd2a997f6f1b5ac6653fe791dfab66e1eb0227740fb86d5dc /tmp/libicu-${ICU_VERSION}.tgz" | sha256sum --check && \
|
||||
mkdir /tmp/icu && \
|
||||
pushd /tmp/icu && \
|
||||
tar -xzf /tmp/libicu-${ICU_VERSION}.tgz && \
|
||||
pushd icu/source && \
|
||||
./configure --prefix=${ICU_PREFIX} --enable-static --enable-shared=no CXXFLAGS="-fPIC" CFLAGS="-fPIC" && \
|
||||
make -j "$(nproc)" && \
|
||||
make install && \
|
||||
popd && \
|
||||
rm -rf icu && \
|
||||
rm -f /tmp/libicu-${ICU_VERSION}.tgz && \
|
||||
popd
|
||||
|
||||
# Switch to nonroot user
|
||||
USER nonroot:nonroot
|
||||
WORKDIR /home/nonroot
|
||||
@@ -170,3 +209,6 @@ RUN whoami \
|
||||
&& rustup --version --verbose \
|
||||
&& rustc --version --verbose \
|
||||
&& clang --version
|
||||
|
||||
# Set following flag to check in Makefile if its running in Docker
|
||||
RUN touch /home/nonroot/.docker_build
|
||||
|
||||
15
Makefile
15
Makefile
@@ -3,6 +3,9 @@ ROOT_PROJECT_DIR := $(dir $(abspath $(lastword $(MAKEFILE_LIST))))
|
||||
# Where to install Postgres, default is ./pg_install, maybe useful for package managers
|
||||
POSTGRES_INSTALL_DIR ?= $(ROOT_PROJECT_DIR)/pg_install/
|
||||
|
||||
OPENSSL_PREFIX_DIR := /usr/local/openssl
|
||||
ICU_PREFIX_DIR := /usr/local/icu
|
||||
|
||||
#
|
||||
# We differentiate between release / debug build types using the BUILD_TYPE
|
||||
# environment variable.
|
||||
@@ -20,6 +23,16 @@ else
|
||||
$(error Bad build type '$(BUILD_TYPE)', see Makefile for options)
|
||||
endif
|
||||
|
||||
ifeq ($(shell test -e /home/nonroot/.docker_build && echo -n yes),yes)
|
||||
# Exclude static build openssl, icu for local build (MacOS, Linux)
|
||||
# Only keep for build type release and debug
|
||||
PG_CFLAGS += -I$(OPENSSL_PREFIX_DIR)/include
|
||||
PG_CONFIGURE_OPTS += --with-icu
|
||||
PG_CONFIGURE_OPTS += ICU_CFLAGS='-I/$(ICU_PREFIX_DIR)/include -DU_STATIC_IMPLEMENTATION'
|
||||
PG_CONFIGURE_OPTS += ICU_LIBS='-L$(ICU_PREFIX_DIR)/lib -L$(ICU_PREFIX_DIR)/lib64 -licui18n -licuuc -licudata -lstdc++ -Wl,-Bdynamic -lm'
|
||||
PG_CONFIGURE_OPTS += LDFLAGS='-L$(OPENSSL_PREFIX_DIR)/lib -L$(OPENSSL_PREFIX_DIR)/lib64 -L$(ICU_PREFIX_DIR)/lib -L$(ICU_PREFIX_DIR)/lib64 -Wl,-Bstatic -lssl -lcrypto -Wl,-Bdynamic -lrt -lm -ldl -lpthread'
|
||||
endif
|
||||
|
||||
UNAME_S := $(shell uname -s)
|
||||
ifeq ($(UNAME_S),Linux)
|
||||
# Seccomp BPF is only available for Linux
|
||||
@@ -28,7 +41,7 @@ else ifeq ($(UNAME_S),Darwin)
|
||||
ifndef DISABLE_HOMEBREW
|
||||
# macOS with brew-installed openssl requires explicit paths
|
||||
# It can be configured with OPENSSL_PREFIX variable
|
||||
OPENSSL_PREFIX ?= $(shell brew --prefix openssl@3)
|
||||
OPENSSL_PREFIX := $(shell brew --prefix openssl@3)
|
||||
PG_CONFIGURE_OPTS += --with-includes=$(OPENSSL_PREFIX)/include --with-libraries=$(OPENSSL_PREFIX)/lib
|
||||
PG_CONFIGURE_OPTS += PKG_CONFIG_PATH=$(shell brew --prefix icu4c)/lib/pkgconfig
|
||||
# macOS already has bison and flex in the system, but they are old and result in postgres-v14 target failure
|
||||
|
||||
@@ -918,38 +918,39 @@ impl ComputeNode {
|
||||
// temporarily reset max_cluster_size in config
|
||||
// to avoid the possibility of hitting the limit, while we are reconfiguring:
|
||||
// creating new extensions, roles, etc...
|
||||
config::compute_ctl_temp_override_create(pgdata_path, "neon.max_cluster_size=-1")?;
|
||||
self.pg_reload_conf()?;
|
||||
config::with_compute_ctl_tmp_override(pgdata_path, "neon.max_cluster_size=-1", || {
|
||||
self.pg_reload_conf()?;
|
||||
|
||||
let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
|
||||
let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
|
||||
|
||||
// Proceed with post-startup configuration. Note, that order of operations is important.
|
||||
// Disable DDL forwarding because control plane already knows about these roles/databases.
|
||||
if spec.mode == ComputeMode::Primary {
|
||||
client.simple_query("SET neon.forward_ddl = false")?;
|
||||
cleanup_instance(&mut client)?;
|
||||
handle_roles(&spec, &mut client)?;
|
||||
handle_databases(&spec, &mut client)?;
|
||||
handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?;
|
||||
handle_grants(
|
||||
&spec,
|
||||
&mut client,
|
||||
self.connstr.as_str(),
|
||||
self.has_feature(ComputeFeature::AnonExtension),
|
||||
)?;
|
||||
handle_extensions(&spec, &mut client)?;
|
||||
handle_extension_neon(&mut client)?;
|
||||
// We can skip handle_migrations here because a new migration can only appear
|
||||
// if we have a new version of the compute_ctl binary, which can only happen
|
||||
// if compute got restarted, in which case we'll end up inside of apply_config
|
||||
// instead of reconfigure.
|
||||
}
|
||||
// Proceed with post-startup configuration. Note, that order of operations is important.
|
||||
// Disable DDL forwarding because control plane already knows about these roles/databases.
|
||||
if spec.mode == ComputeMode::Primary {
|
||||
client.simple_query("SET neon.forward_ddl = false")?;
|
||||
cleanup_instance(&mut client)?;
|
||||
handle_roles(&spec, &mut client)?;
|
||||
handle_databases(&spec, &mut client)?;
|
||||
handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?;
|
||||
handle_grants(
|
||||
&spec,
|
||||
&mut client,
|
||||
self.connstr.as_str(),
|
||||
self.has_feature(ComputeFeature::AnonExtension),
|
||||
)?;
|
||||
handle_extensions(&spec, &mut client)?;
|
||||
handle_extension_neon(&mut client)?;
|
||||
// We can skip handle_migrations here because a new migration can only appear
|
||||
// if we have a new version of the compute_ctl binary, which can only happen
|
||||
// if compute got restarted, in which case we'll end up inside of apply_config
|
||||
// instead of reconfigure.
|
||||
}
|
||||
|
||||
// 'Close' connection
|
||||
drop(client);
|
||||
// 'Close' connection
|
||||
drop(client);
|
||||
|
||||
Ok(())
|
||||
})?;
|
||||
|
||||
// reset max_cluster_size in config back to original value and reload config
|
||||
config::compute_ctl_temp_override_remove(pgdata_path)?;
|
||||
self.pg_reload_conf()?;
|
||||
|
||||
let unknown_op = "unknown".to_string();
|
||||
@@ -1040,12 +1041,17 @@ impl ComputeNode {
|
||||
// temporarily reset max_cluster_size in config
|
||||
// to avoid the possibility of hitting the limit, while we are applying config:
|
||||
// creating new extensions, roles, etc...
|
||||
config::compute_ctl_temp_override_create(pgdata_path, "neon.max_cluster_size=-1")?;
|
||||
self.pg_reload_conf()?;
|
||||
config::with_compute_ctl_tmp_override(
|
||||
pgdata_path,
|
||||
"neon.max_cluster_size=-1",
|
||||
|| {
|
||||
self.pg_reload_conf()?;
|
||||
|
||||
self.apply_config(&compute_state)?;
|
||||
self.apply_config(&compute_state)?;
|
||||
|
||||
config::compute_ctl_temp_override_remove(pgdata_path)?;
|
||||
Ok(())
|
||||
},
|
||||
)?;
|
||||
self.pg_reload_conf()?;
|
||||
}
|
||||
self.post_apply_config()?;
|
||||
|
||||
@@ -131,18 +131,17 @@ pub fn write_postgres_conf(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// create file compute_ctl_temp_override.conf in pgdata_dir
|
||||
/// add provided options to this file
|
||||
pub fn compute_ctl_temp_override_create(pgdata_path: &Path, options: &str) -> Result<()> {
|
||||
pub fn with_compute_ctl_tmp_override<F>(pgdata_path: &Path, options: &str, exec: F) -> Result<()>
|
||||
where
|
||||
F: FnOnce() -> Result<()>,
|
||||
{
|
||||
let path = pgdata_path.join("compute_ctl_temp_override.conf");
|
||||
let mut file = File::create(path)?;
|
||||
write!(file, "{}", options)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// remove file compute_ctl_temp_override.conf in pgdata_dir
|
||||
pub fn compute_ctl_temp_override_remove(pgdata_path: &Path) -> Result<()> {
|
||||
let path = pgdata_path.join("compute_ctl_temp_override.conf");
|
||||
std::fs::remove_file(path)?;
|
||||
Ok(())
|
||||
let res = exec();
|
||||
|
||||
file.set_len(0)?;
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
@@ -17,7 +17,7 @@ use hyper::header::CONTENT_TYPE;
|
||||
use hyper::service::{make_service_fn, service_fn};
|
||||
use hyper::{Body, Method, Request, Response, Server, StatusCode};
|
||||
use tokio::task;
|
||||
use tracing::{error, info, warn};
|
||||
use tracing::{debug, error, info, warn};
|
||||
use tracing_utils::http::OtelName;
|
||||
use utils::http::request::must_get_query_param;
|
||||
|
||||
@@ -48,7 +48,7 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
|
||||
match (req.method(), req.uri().path()) {
|
||||
// Serialized compute state.
|
||||
(&Method::GET, "/status") => {
|
||||
info!("serving /status GET request");
|
||||
debug!("serving /status GET request");
|
||||
let state = compute.state.lock().unwrap();
|
||||
let status_response = status_response_from_state(&state);
|
||||
Response::new(Body::from(serde_json::to_string(&status_response).unwrap()))
|
||||
|
||||
@@ -862,20 +862,13 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
|
||||
|
||||
let allow_multiple = sub_args.get_flag("allow-multiple");
|
||||
|
||||
// If --safekeepers argument is given, use only the listed safekeeper nodes.
|
||||
let safekeepers =
|
||||
if let Some(safekeepers_str) = sub_args.get_one::<String>("safekeepers") {
|
||||
let mut safekeepers: Vec<NodeId> = Vec::new();
|
||||
for sk_id in safekeepers_str.split(',').map(str::trim) {
|
||||
let sk_id = NodeId(u64::from_str(sk_id).map_err(|_| {
|
||||
anyhow!("invalid node ID \"{sk_id}\" in --safekeepers list")
|
||||
})?);
|
||||
safekeepers.push(sk_id);
|
||||
}
|
||||
safekeepers
|
||||
} else {
|
||||
env.safekeepers.iter().map(|sk| sk.id).collect()
|
||||
};
|
||||
// If --safekeepers argument is given, use only the listed
|
||||
// safekeeper nodes; otherwise all from the env.
|
||||
let safekeepers = if let Some(safekeepers) = parse_safekeepers(&sub_args)? {
|
||||
safekeepers
|
||||
} else {
|
||||
env.safekeepers.iter().map(|sk| sk.id).collect()
|
||||
};
|
||||
|
||||
let endpoint = cplane
|
||||
.endpoints
|
||||
@@ -979,7 +972,10 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
endpoint.reconfigure(pageservers, None).await?;
|
||||
// If --safekeepers argument is given, use only the listed
|
||||
// safekeeper nodes; otherwise all from the env.
|
||||
let safekeepers = parse_safekeepers(&sub_args)?;
|
||||
endpoint.reconfigure(pageservers, None, safekeepers).await?;
|
||||
}
|
||||
"stop" => {
|
||||
let endpoint_id = sub_args
|
||||
@@ -1001,6 +997,23 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Parse --safekeepers as list of safekeeper ids.
|
||||
fn parse_safekeepers(sub_args: &ArgMatches) -> Result<Option<Vec<NodeId>>> {
|
||||
if let Some(safekeepers_str) = sub_args.get_one::<String>("safekeepers") {
|
||||
let mut safekeepers: Vec<NodeId> = Vec::new();
|
||||
for sk_id in safekeepers_str.split(',').map(str::trim) {
|
||||
let sk_id = NodeId(
|
||||
u64::from_str(sk_id)
|
||||
.map_err(|_| anyhow!("invalid node ID \"{sk_id}\" in --safekeepers list"))?,
|
||||
);
|
||||
safekeepers.push(sk_id);
|
||||
}
|
||||
Ok(Some(safekeepers))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_mappings(sub_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Result<()> {
|
||||
let (sub_name, sub_args) = match sub_match.subcommand() {
|
||||
Some(ep_subcommand_data) => ep_subcommand_data,
|
||||
@@ -1573,7 +1586,7 @@ fn cli() -> Command {
|
||||
.about("Start postgres.\n If the endpoint doesn't exist yet, it is created.")
|
||||
.arg(endpoint_id_arg.clone())
|
||||
.arg(endpoint_pageserver_id_arg.clone())
|
||||
.arg(safekeepers_arg)
|
||||
.arg(safekeepers_arg.clone())
|
||||
.arg(remote_ext_config_args)
|
||||
.arg(create_test_user)
|
||||
.arg(allow_multiple.clone())
|
||||
@@ -1581,6 +1594,7 @@ fn cli() -> Command {
|
||||
.subcommand(Command::new("reconfigure")
|
||||
.about("Reconfigure the endpoint")
|
||||
.arg(endpoint_pageserver_id_arg)
|
||||
.arg(safekeepers_arg)
|
||||
.arg(endpoint_id_arg.clone())
|
||||
.arg(tenant_id_arg.clone())
|
||||
)
|
||||
|
||||
@@ -499,6 +499,23 @@ impl Endpoint {
|
||||
.join(",")
|
||||
}
|
||||
|
||||
/// Map safekeepers ids to the actual connection strings.
|
||||
fn build_safekeepers_connstrs(&self, sk_ids: Vec<NodeId>) -> Result<Vec<String>> {
|
||||
let mut safekeeper_connstrings = Vec::new();
|
||||
if self.mode == ComputeMode::Primary {
|
||||
for sk_id in sk_ids {
|
||||
let sk = self
|
||||
.env
|
||||
.safekeepers
|
||||
.iter()
|
||||
.find(|node| node.id == sk_id)
|
||||
.ok_or_else(|| anyhow!("safekeeper {sk_id} does not exist"))?;
|
||||
safekeeper_connstrings.push(format!("127.0.0.1:{}", sk.get_compute_port()));
|
||||
}
|
||||
}
|
||||
Ok(safekeeper_connstrings)
|
||||
}
|
||||
|
||||
pub async fn start(
|
||||
&self,
|
||||
auth_token: &Option<String>,
|
||||
@@ -523,18 +540,7 @@ impl Endpoint {
|
||||
let pageserver_connstring = Self::build_pageserver_connstr(&pageservers);
|
||||
assert!(!pageserver_connstring.is_empty());
|
||||
|
||||
let mut safekeeper_connstrings = Vec::new();
|
||||
if self.mode == ComputeMode::Primary {
|
||||
for sk_id in safekeepers {
|
||||
let sk = self
|
||||
.env
|
||||
.safekeepers
|
||||
.iter()
|
||||
.find(|node| node.id == sk_id)
|
||||
.ok_or_else(|| anyhow!("safekeeper {sk_id} does not exist"))?;
|
||||
safekeeper_connstrings.push(format!("127.0.0.1:{}", sk.get_compute_port()));
|
||||
}
|
||||
}
|
||||
let safekeeper_connstrings = self.build_safekeepers_connstrs(safekeepers)?;
|
||||
|
||||
// check for file remote_extensions_spec.json
|
||||
// if it is present, read it and pass to compute_ctl
|
||||
@@ -741,6 +747,7 @@ impl Endpoint {
|
||||
&self,
|
||||
mut pageservers: Vec<(Host, u16)>,
|
||||
stripe_size: Option<ShardStripeSize>,
|
||||
safekeepers: Option<Vec<NodeId>>,
|
||||
) -> Result<()> {
|
||||
let mut spec: ComputeSpec = {
|
||||
let spec_path = self.endpoint_path().join("spec.json");
|
||||
@@ -775,6 +782,12 @@ impl Endpoint {
|
||||
spec.shard_stripe_size = stripe_size.map(|s| s.0 as usize);
|
||||
}
|
||||
|
||||
// If safekeepers are not specified, don't change them.
|
||||
if let Some(safekeepers) = safekeepers {
|
||||
let safekeeper_connstrings = self.build_safekeepers_connstrs(safekeepers)?;
|
||||
spec.safekeeper_connstrings = safekeeper_connstrings;
|
||||
}
|
||||
|
||||
let client = reqwest::Client::builder()
|
||||
.timeout(Duration::from_secs(30))
|
||||
.build()
|
||||
|
||||
@@ -14,6 +14,7 @@ use camino::Utf8PathBuf;
|
||||
use postgres_connection::PgConnectionConfig;
|
||||
use reqwest::{IntoUrl, Method};
|
||||
use thiserror::Error;
|
||||
use utils::auth::{Claims, Scope};
|
||||
use utils::{http::error::HttpErrorBody, id::NodeId};
|
||||
|
||||
use crate::{
|
||||
@@ -197,7 +198,7 @@ impl SafekeeperNode {
|
||||
&datadir,
|
||||
&self.env.safekeeper_bin(),
|
||||
&args,
|
||||
[],
|
||||
self.safekeeper_env_variables()?,
|
||||
background_process::InitialPidFile::Expect(self.pid_file()),
|
||||
|| async {
|
||||
match self.check_status().await {
|
||||
@@ -210,6 +211,18 @@ impl SafekeeperNode {
|
||||
.await
|
||||
}
|
||||
|
||||
fn safekeeper_env_variables(&self) -> anyhow::Result<Vec<(String, String)>> {
|
||||
// Generate a token to connect from safekeeper to peers
|
||||
if self.conf.auth_enabled {
|
||||
let token = self
|
||||
.env
|
||||
.generate_auth_token(&Claims::new(None, Scope::SafekeeperData))?;
|
||||
Ok(vec![("SAFEKEEPER_AUTH_TOKEN".to_owned(), token)])
|
||||
} else {
|
||||
Ok(Vec::new())
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Stop the server.
|
||||
///
|
||||
|
||||
@@ -25,6 +25,8 @@ pub struct Config {
|
||||
///
|
||||
/// For simplicity, this value must be greater than or equal to `memory_history_len`.
|
||||
memory_history_log_interval: usize,
|
||||
/// The max number of iterations to skip before logging the next iteration
|
||||
memory_history_log_noskip_interval: Duration,
|
||||
}
|
||||
|
||||
impl Default for Config {
|
||||
@@ -33,6 +35,7 @@ impl Default for Config {
|
||||
memory_poll_interval: Duration::from_millis(100),
|
||||
memory_history_len: 5, // use 500ms of history for decision-making
|
||||
memory_history_log_interval: 20, // but only log every ~2s (otherwise it's spammy)
|
||||
memory_history_log_noskip_interval: Duration::from_secs(15), // but only if it's changed, or 60 seconds have passed
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -85,7 +88,12 @@ impl CgroupWatcher {
|
||||
|
||||
// buffer for samples that will be logged. once full, it remains so.
|
||||
let history_log_len = self.config.memory_history_log_interval;
|
||||
let max_skip = self.config.memory_history_log_noskip_interval;
|
||||
let mut history_log_buf = vec![MemoryStatus::zeroed(); history_log_len];
|
||||
let mut last_logged_memusage = MemoryStatus::zeroed();
|
||||
|
||||
// Ensure that we're tracking a value that's definitely in the past, as Instant::now is only guaranteed to be non-decreasing on Rust's T1-supported systems.
|
||||
let mut can_skip_logs_until = Instant::now() - max_skip;
|
||||
|
||||
for t in 0_u64.. {
|
||||
ticker.tick().await;
|
||||
@@ -115,12 +123,24 @@ impl CgroupWatcher {
|
||||
// equal to the logging interval, we can just log the entire buffer every time we set
|
||||
// the last entry, which also means that for this log line, we can ignore that it's a
|
||||
// ring buffer (because all the entries are in order of increasing time).
|
||||
if i == history_log_len - 1 {
|
||||
//
|
||||
// We skip logging the data if data hasn't meaningfully changed in a while, unless
|
||||
// we've already ignored previous iterations for the last max_skip period.
|
||||
if i == history_log_len - 1
|
||||
&& (now > can_skip_logs_until
|
||||
|| !history_log_buf
|
||||
.iter()
|
||||
.all(|usage| last_logged_memusage.status_is_close_or_similar(usage)))
|
||||
{
|
||||
info!(
|
||||
history = ?MemoryStatus::debug_slice(&history_log_buf),
|
||||
summary = ?summary,
|
||||
"Recent cgroup memory statistics history"
|
||||
);
|
||||
|
||||
can_skip_logs_until = now + max_skip;
|
||||
|
||||
last_logged_memusage = *history_log_buf.last().unwrap();
|
||||
}
|
||||
|
||||
updates
|
||||
@@ -232,6 +252,24 @@ impl MemoryStatus {
|
||||
|
||||
DS(slice)
|
||||
}
|
||||
|
||||
/// Check if the other memory status is a close or similar result.
|
||||
/// Returns true if the larger value is not larger than the smaller value
|
||||
/// by 1/8 of the smaller value, and within 128MiB.
|
||||
/// See tests::check_similarity_behaviour for examples of behaviour
|
||||
fn status_is_close_or_similar(&self, other: &MemoryStatus) -> bool {
|
||||
let margin;
|
||||
let diff;
|
||||
if self.non_reclaimable >= other.non_reclaimable {
|
||||
margin = other.non_reclaimable / 8;
|
||||
diff = self.non_reclaimable - other.non_reclaimable;
|
||||
} else {
|
||||
margin = self.non_reclaimable / 8;
|
||||
diff = other.non_reclaimable - self.non_reclaimable;
|
||||
}
|
||||
|
||||
diff < margin && diff < 128 * 1024 * 1024
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -261,4 +299,65 @@ mod tests {
|
||||
assert_eq!(values(2, 4), [9, 0, 1, 2]);
|
||||
assert_eq!(values(2, 10), [3, 4, 5, 6, 7, 8, 9, 0, 1, 2]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn check_similarity_behaviour() {
|
||||
// This all accesses private methods, so we can't actually run this
|
||||
// as doctests, because doctests run as an external crate.
|
||||
let mut small = super::MemoryStatus {
|
||||
non_reclaimable: 1024,
|
||||
};
|
||||
let mut large = super::MemoryStatus {
|
||||
non_reclaimable: 1024 * 1024 * 1024 * 1024,
|
||||
};
|
||||
|
||||
// objects are self-similar, no matter the size
|
||||
assert!(small.status_is_close_or_similar(&small));
|
||||
assert!(large.status_is_close_or_similar(&large));
|
||||
|
||||
// inequality is symmetric
|
||||
assert!(!small.status_is_close_or_similar(&large));
|
||||
assert!(!large.status_is_close_or_similar(&small));
|
||||
|
||||
small.non_reclaimable = 64;
|
||||
large.non_reclaimable = (small.non_reclaimable / 8) * 9;
|
||||
|
||||
// objects are self-similar, no matter the size
|
||||
assert!(small.status_is_close_or_similar(&small));
|
||||
assert!(large.status_is_close_or_similar(&large));
|
||||
|
||||
// values are similar if the larger value is larger by less than
|
||||
// 12.5%, i.e. 1/8 of the smaller value.
|
||||
// In the example above, large is exactly 12.5% larger, so this doesn't
|
||||
// match.
|
||||
assert!(!small.status_is_close_or_similar(&large));
|
||||
assert!(!large.status_is_close_or_similar(&small));
|
||||
|
||||
large.non_reclaimable -= 1;
|
||||
assert!(large.status_is_close_or_similar(&large));
|
||||
|
||||
assert!(small.status_is_close_or_similar(&large));
|
||||
assert!(large.status_is_close_or_similar(&small));
|
||||
|
||||
// The 1/8 rule only applies up to 128MiB of difference
|
||||
small.non_reclaimable = 1024 * 1024 * 1024 * 1024;
|
||||
large.non_reclaimable = small.non_reclaimable / 8 * 9;
|
||||
assert!(small.status_is_close_or_similar(&small));
|
||||
assert!(large.status_is_close_or_similar(&large));
|
||||
|
||||
assert!(!small.status_is_close_or_similar(&large));
|
||||
assert!(!large.status_is_close_or_similar(&small));
|
||||
// the large value is put just above the threshold
|
||||
large.non_reclaimable = small.non_reclaimable + 128 * 1024 * 1024;
|
||||
assert!(large.status_is_close_or_similar(&large));
|
||||
|
||||
assert!(!small.status_is_close_or_similar(&large));
|
||||
assert!(!large.status_is_close_or_similar(&small));
|
||||
// now below
|
||||
large.non_reclaimable -= 1;
|
||||
assert!(large.status_is_close_or_similar(&large));
|
||||
|
||||
assert!(small.status_is_close_or_similar(&large));
|
||||
assert!(large.status_is_close_or_similar(&small));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,11 +12,11 @@ use futures::{
|
||||
stream::{SplitSink, SplitStream},
|
||||
SinkExt, StreamExt,
|
||||
};
|
||||
use tracing::info;
|
||||
use tracing::{debug, info};
|
||||
|
||||
use crate::protocol::{
|
||||
OutboundMsg, ProtocolRange, ProtocolResponse, ProtocolVersion, PROTOCOL_MAX_VERSION,
|
||||
PROTOCOL_MIN_VERSION,
|
||||
OutboundMsg, OutboundMsgKind, ProtocolRange, ProtocolResponse, ProtocolVersion,
|
||||
PROTOCOL_MAX_VERSION, PROTOCOL_MIN_VERSION,
|
||||
};
|
||||
|
||||
/// The central handler for all communications in the monitor.
|
||||
@@ -118,7 +118,12 @@ impl Dispatcher {
|
||||
/// serialize the wrong thing and send it, since `self.sink.send` will take
|
||||
/// any string.
|
||||
pub async fn send(&mut self, message: OutboundMsg) -> anyhow::Result<()> {
|
||||
info!(?message, "sending message");
|
||||
if matches!(&message.inner, OutboundMsgKind::HealthCheck { .. }) {
|
||||
debug!(?message, "sending message");
|
||||
} else {
|
||||
info!(?message, "sending message");
|
||||
}
|
||||
|
||||
let json = serde_json::to_string(&message).context("failed to serialize message")?;
|
||||
self.sink
|
||||
.send(Message::Text(json))
|
||||
|
||||
@@ -12,7 +12,7 @@ use axum::extract::ws::{Message, WebSocket};
|
||||
use futures::StreamExt;
|
||||
use tokio::sync::{broadcast, watch};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, info, warn};
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
use crate::cgroup::{self, CgroupWatcher};
|
||||
use crate::dispatcher::Dispatcher;
|
||||
@@ -474,26 +474,29 @@ impl Runner {
|
||||
// there is a message from the agent
|
||||
msg = self.dispatcher.source.next() => {
|
||||
if let Some(msg) = msg {
|
||||
// Don't use 'message' as a key as the string also uses
|
||||
// that for its key
|
||||
info!(?msg, "received message");
|
||||
match msg {
|
||||
match &msg {
|
||||
Ok(msg) => {
|
||||
let message: InboundMsg = match msg {
|
||||
Message::Text(text) => {
|
||||
serde_json::from_str(&text).context("failed to deserialize text message")?
|
||||
serde_json::from_str(text).context("failed to deserialize text message")?
|
||||
}
|
||||
other => {
|
||||
warn!(
|
||||
// Don't use 'message' as a key as the
|
||||
// string also uses that for its key
|
||||
msg = ?other,
|
||||
"agent should only send text messages but received different type"
|
||||
"problem processing incoming message: agent should only send text messages but received different type"
|
||||
);
|
||||
continue
|
||||
},
|
||||
};
|
||||
|
||||
if matches!(&message.inner, InboundMsgKind::HealthCheck { .. }) {
|
||||
debug!(?msg, "received message");
|
||||
} else {
|
||||
info!(?msg, "received message");
|
||||
}
|
||||
|
||||
let out = match self.process_message(message.clone()).await {
|
||||
Ok(Some(out)) => out,
|
||||
Ok(None) => continue,
|
||||
@@ -517,7 +520,11 @@ impl Runner {
|
||||
.await
|
||||
.context("failed to send message")?;
|
||||
}
|
||||
Err(e) => warn!("{e}"),
|
||||
Err(e) => warn!(
|
||||
error = format!("{e}"),
|
||||
msg = ?msg,
|
||||
"received error message"
|
||||
),
|
||||
}
|
||||
} else {
|
||||
anyhow::bail!("dispatcher connection closed")
|
||||
|
||||
@@ -60,10 +60,6 @@ pub(crate) enum CalculateSyntheticSizeError {
|
||||
#[error(transparent)]
|
||||
Fatal(anyhow::Error),
|
||||
|
||||
/// The LSN we are trying to calculate a size at no longer exists at the point we query it
|
||||
#[error("Could not find size at {lsn} in timeline {timeline_id}")]
|
||||
LsnNotFound { timeline_id: TimelineId, lsn: Lsn },
|
||||
|
||||
/// Tenant shut down while calculating size
|
||||
#[error("Cancelled")]
|
||||
Cancelled,
|
||||
@@ -375,9 +371,8 @@ pub(super) async fn gather_inputs(
|
||||
|
||||
/// Augment 'segments' with logical sizes
|
||||
///
|
||||
/// this will probably conflict with on-demand downloaded layers, or at least force them all
|
||||
/// to be downloaded
|
||||
///
|
||||
/// This will leave segments' sizes as None if the Timeline associated with the segment is deleted concurrently
|
||||
/// (i.e. we cannot read its logical size at a particular LSN).
|
||||
async fn fill_logical_sizes(
|
||||
timelines: &[Arc<Timeline>],
|
||||
segments: &mut [SegmentMeta],
|
||||
@@ -498,8 +493,6 @@ async fn fill_logical_sizes(
|
||||
|
||||
if let Some(Some(size)) = sizes_needed.get(&(timeline_id, lsn)) {
|
||||
seg.segment.size = Some(*size);
|
||||
} else {
|
||||
return Err(CalculateSyntheticSizeError::LsnNotFound { timeline_id, lsn });
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
|
||||
@@ -286,7 +286,6 @@ WalProposerPoll(WalProposer *wp)
|
||||
void
|
||||
WalProposerStart(WalProposer *wp)
|
||||
{
|
||||
|
||||
/* Initiate connections to all safekeeper nodes */
|
||||
for (int i = 0; i < wp->n_safekeepers; i++)
|
||||
{
|
||||
|
||||
@@ -63,6 +63,8 @@ char *wal_acceptors_list = "";
|
||||
int wal_acceptor_reconnect_timeout = 1000;
|
||||
int wal_acceptor_connection_timeout = 10000;
|
||||
|
||||
/* Set to true in the walproposer bgw. */
|
||||
static bool am_walproposer;
|
||||
static WalproposerShmemState *walprop_shared;
|
||||
static WalProposerConfig walprop_config;
|
||||
static XLogRecPtr sentPtr = InvalidXLogRecPtr;
|
||||
@@ -76,6 +78,7 @@ static HotStandbyFeedback agg_hs_feedback;
|
||||
|
||||
static void nwp_shmem_startup_hook(void);
|
||||
static void nwp_register_gucs(void);
|
||||
static void assign_neon_safekeepers(const char *newval, void *extra);
|
||||
static void nwp_prepare_shmem(void);
|
||||
static uint64 backpressure_lag_impl(void);
|
||||
static bool backpressure_throttling_impl(void);
|
||||
@@ -116,7 +119,8 @@ init_walprop_config(bool syncSafekeepers)
|
||||
{
|
||||
walprop_config.neon_tenant = neon_tenant;
|
||||
walprop_config.neon_timeline = neon_timeline;
|
||||
walprop_config.safekeepers_list = wal_acceptors_list;
|
||||
/* WalProposerCreate scribbles directly on it, so pstrdup */
|
||||
walprop_config.safekeepers_list = pstrdup(wal_acceptors_list);
|
||||
walprop_config.safekeeper_reconnect_timeout = wal_acceptor_reconnect_timeout;
|
||||
walprop_config.safekeeper_connection_timeout = wal_acceptor_connection_timeout;
|
||||
walprop_config.wal_segment_size = wal_segment_size;
|
||||
@@ -156,6 +160,7 @@ WalProposerMain(Datum main_arg)
|
||||
|
||||
init_walprop_config(false);
|
||||
walprop_pg_init_bgworker();
|
||||
am_walproposer = true;
|
||||
walprop_pg_load_libpqwalreceiver();
|
||||
|
||||
wp = WalProposerCreate(&walprop_config, walprop_pg);
|
||||
@@ -194,10 +199,10 @@ nwp_register_gucs(void)
|
||||
NULL, /* long_desc */
|
||||
&wal_acceptors_list, /* valueAddr */
|
||||
"", /* bootValue */
|
||||
PGC_POSTMASTER,
|
||||
PGC_SIGHUP,
|
||||
GUC_LIST_INPUT, /* extensions can't use*
|
||||
* GUC_LIST_QUOTE */
|
||||
NULL, NULL, NULL);
|
||||
NULL, assign_neon_safekeepers, NULL);
|
||||
|
||||
DefineCustomIntVariable(
|
||||
"neon.safekeeper_reconnect_timeout",
|
||||
@@ -220,6 +225,33 @@ nwp_register_gucs(void)
|
||||
NULL, NULL, NULL);
|
||||
}
|
||||
|
||||
/*
|
||||
* GUC assign_hook for neon.safekeepers. Restarts walproposer through FATAL if
|
||||
* the list changed.
|
||||
*/
|
||||
static void
|
||||
assign_neon_safekeepers(const char *newval, void *extra)
|
||||
{
|
||||
if (!am_walproposer)
|
||||
return;
|
||||
|
||||
if (!newval) {
|
||||
/* should never happen */
|
||||
wpg_log(FATAL, "neon.safekeepers is empty");
|
||||
}
|
||||
|
||||
/*
|
||||
* TODO: restarting through FATAL is stupid and introduces 1s delay before
|
||||
* next bgw start. We should refactor walproposer to allow graceful exit and
|
||||
* thus remove this delay.
|
||||
*/
|
||||
if (strcmp(wal_acceptors_list, newval) != 0)
|
||||
{
|
||||
wpg_log(FATAL, "restarting walproposer to change safekeeper list from %s to %s",
|
||||
wal_acceptors_list, newval);
|
||||
}
|
||||
}
|
||||
|
||||
/* Check if we need to suspend inserts because of lagging replication. */
|
||||
static uint64
|
||||
backpressure_lag_impl(void)
|
||||
@@ -368,7 +400,7 @@ walprop_register_bgworker(void)
|
||||
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "WalProposerMain");
|
||||
snprintf(bgw.bgw_name, BGW_MAXLEN, "WAL proposer");
|
||||
snprintf(bgw.bgw_type, BGW_MAXLEN, "WAL proposer");
|
||||
bgw.bgw_restart_time = 5;
|
||||
bgw.bgw_restart_time = 1;
|
||||
bgw.bgw_notify_pid = 0;
|
||||
bgw.bgw_main_arg = (Datum) 0;
|
||||
|
||||
@@ -1238,13 +1270,8 @@ WalSndLoop(WalProposer *wp)
|
||||
{
|
||||
XLogRecPtr flushPtr;
|
||||
|
||||
/* Clear any already-pending wakeups */
|
||||
ResetLatch(MyLatch);
|
||||
|
||||
for (;;)
|
||||
{
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
XLogBroadcastWalProposer(wp);
|
||||
WalProposerPoll(wp);
|
||||
}
|
||||
@@ -1775,6 +1802,20 @@ walprop_pg_wait_event_set(WalProposer *wp, long timeout, Safekeeper **sk, uint32
|
||||
late_cv_trigger = ConditionVariableCancelSleep();
|
||||
#endif
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
/*
|
||||
* Process config if requested. This restarts walproposer if safekeepers
|
||||
* list changed. Don't do that for sync-safekeepers because quite probably
|
||||
* it (re-reading config) won't work without some effort, and
|
||||
* sync-safekeepers should be quick to finish anyway.
|
||||
*/
|
||||
if (!wp->config->syncSafekeepers && ConfigReloadPending)
|
||||
{
|
||||
ConfigReloadPending = false;
|
||||
ProcessConfigFile(PGC_SIGHUP);
|
||||
}
|
||||
|
||||
/*
|
||||
* If wait is terminated by latch set (walsenders' latch is set on each
|
||||
* wal flush). (no need for pm death check due to WL_EXIT_ON_PM_DEATH)
|
||||
|
||||
@@ -46,6 +46,7 @@ tokio = { workspace = true, features = ["fs"] }
|
||||
tokio-util = { workspace = true }
|
||||
tokio-io-timeout.workspace = true
|
||||
tokio-postgres.workspace = true
|
||||
tokio-tar.workspace = true
|
||||
toml_edit.workspace = true
|
||||
tracing.workspace = true
|
||||
url.workspace = true
|
||||
|
||||
@@ -13,7 +13,9 @@ use tokio::runtime::Handle;
|
||||
use tokio::signal::unix::{signal, SignalKind};
|
||||
use tokio::task::JoinError;
|
||||
use toml_edit::Document;
|
||||
use utils::logging::SecretString;
|
||||
|
||||
use std::env::{var, VarError};
|
||||
use std::fs::{self, File};
|
||||
use std::io::{ErrorKind, Write};
|
||||
use std::str::FromStr;
|
||||
@@ -287,6 +289,22 @@ async fn main() -> anyhow::Result<()> {
|
||||
}
|
||||
};
|
||||
|
||||
// Load JWT auth token to connect to other safekeepers for pull_timeline.
|
||||
let sk_auth_token = match var("SAFEKEEPER_AUTH_TOKEN") {
|
||||
Ok(v) => {
|
||||
info!("loaded JWT token for authentication with safekeepers");
|
||||
Some(SecretString::from(v))
|
||||
}
|
||||
Err(VarError::NotPresent) => {
|
||||
info!("no JWT token for authentication with safekeepers detected");
|
||||
None
|
||||
}
|
||||
Err(_) => {
|
||||
warn!("JWT token for authentication with safekeepers is not unicode");
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
let conf = SafeKeeperConf {
|
||||
workdir,
|
||||
my_id: id,
|
||||
@@ -307,6 +325,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
pg_auth,
|
||||
pg_tenant_only_auth,
|
||||
http_auth,
|
||||
sk_auth_token,
|
||||
current_thread_runtime: args.current_thread_runtime,
|
||||
walsenders_keep_horizon: args.walsenders_keep_horizon,
|
||||
partial_backup_enabled: args.partial_backup_enabled,
|
||||
|
||||
@@ -23,7 +23,7 @@ pub const SK_MAGIC: u32 = 0xcafeceefu32;
|
||||
pub const SK_FORMAT_VERSION: u32 = 8;
|
||||
|
||||
// contains persistent metadata for safekeeper
|
||||
const CONTROL_FILE_NAME: &str = "safekeeper.control";
|
||||
pub const CONTROL_FILE_NAME: &str = "safekeeper.control";
|
||||
// needed to atomically update the state using `rename`
|
||||
const CONTROL_FILE_NAME_PARTIAL: &str = "safekeeper.control.partial";
|
||||
pub const CHECKSUM_SIZE: usize = std::mem::size_of::<u32>();
|
||||
|
||||
139
safekeeper/src/http/client.rs
Normal file
139
safekeeper/src/http/client.rs
Normal file
@@ -0,0 +1,139 @@
|
||||
//! Safekeeper http client.
|
||||
//!
|
||||
//! Partially copied from pageserver client; some parts might be better to be
|
||||
//! united.
|
||||
//!
|
||||
//! It would be also good to move it out to separate crate, but this needs
|
||||
//! duplication of internal-but-reported structs like WalSenderState, ServerInfo
|
||||
//! etc.
|
||||
|
||||
use reqwest::{IntoUrl, Method, StatusCode};
|
||||
use utils::{
|
||||
http::error::HttpErrorBody,
|
||||
id::{TenantId, TimelineId},
|
||||
logging::SecretString,
|
||||
};
|
||||
|
||||
use super::routes::TimelineStatus;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Client {
|
||||
mgmt_api_endpoint: String,
|
||||
authorization_header: Option<SecretString>,
|
||||
client: reqwest::Client,
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum Error {
|
||||
/// Failed to receive body (reqwest error).
|
||||
#[error("receive body: {0}")]
|
||||
ReceiveBody(reqwest::Error),
|
||||
|
||||
/// Status is not ok, but failed to parse body as `HttpErrorBody`.
|
||||
#[error("receive error body: {0}")]
|
||||
ReceiveErrorBody(String),
|
||||
|
||||
/// Status is not ok; parsed error in body as `HttpErrorBody`.
|
||||
#[error("safekeeper API: {1}")]
|
||||
ApiError(StatusCode, String),
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
pub trait ResponseErrorMessageExt: Sized {
|
||||
fn error_from_body(self) -> impl std::future::Future<Output = Result<Self>> + Send;
|
||||
}
|
||||
|
||||
/// If status is not ok, try to extract error message from the body.
|
||||
impl ResponseErrorMessageExt for reqwest::Response {
|
||||
async fn error_from_body(self) -> Result<Self> {
|
||||
let status = self.status();
|
||||
if !(status.is_client_error() || status.is_server_error()) {
|
||||
return Ok(self);
|
||||
}
|
||||
|
||||
let url = self.url().to_owned();
|
||||
Err(match self.json::<HttpErrorBody>().await {
|
||||
Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
|
||||
Err(_) => {
|
||||
Error::ReceiveErrorBody(format!("http error ({}) at {}.", status.as_u16(), url))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Client {
|
||||
pub fn new(mgmt_api_endpoint: String, jwt: Option<SecretString>) -> Self {
|
||||
Self::from_client(reqwest::Client::new(), mgmt_api_endpoint, jwt)
|
||||
}
|
||||
|
||||
pub fn from_client(
|
||||
client: reqwest::Client,
|
||||
mgmt_api_endpoint: String,
|
||||
jwt: Option<SecretString>,
|
||||
) -> Self {
|
||||
Self {
|
||||
mgmt_api_endpoint,
|
||||
authorization_header: jwt
|
||||
.map(|jwt| SecretString::from(format!("Bearer {}", jwt.get_contents()))),
|
||||
client,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn timeline_status(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> Result<TimelineStatus> {
|
||||
let uri = format!(
|
||||
"{}/v1/tenant/{}/timeline/{}",
|
||||
self.mgmt_api_endpoint, tenant_id, timeline_id
|
||||
);
|
||||
let resp = self.get(&uri).await?;
|
||||
resp.json().await.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
pub async fn snapshot(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> Result<reqwest::Response> {
|
||||
let uri = format!(
|
||||
"{}/v1/tenant/{}/timeline/{}/snapshot",
|
||||
self.mgmt_api_endpoint, tenant_id, timeline_id
|
||||
);
|
||||
self.get(&uri).await
|
||||
}
|
||||
|
||||
async fn get<U: IntoUrl>(&self, uri: U) -> Result<reqwest::Response> {
|
||||
self.request(Method::GET, uri, ()).await
|
||||
}
|
||||
|
||||
/// Send the request and check that the status code is good.
|
||||
async fn request<B: serde::Serialize, U: reqwest::IntoUrl>(
|
||||
&self,
|
||||
method: Method,
|
||||
uri: U,
|
||||
body: B,
|
||||
) -> Result<reqwest::Response> {
|
||||
let res = self.request_noerror(method, uri, body).await?;
|
||||
let response = res.error_from_body().await?;
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
/// Just send the request.
|
||||
async fn request_noerror<B: serde::Serialize, U: reqwest::IntoUrl>(
|
||||
&self,
|
||||
method: Method,
|
||||
uri: U,
|
||||
body: B,
|
||||
) -> Result<reqwest::Response> {
|
||||
let req = self.client.request(method, uri);
|
||||
let req = if let Some(value) = &self.authorization_header {
|
||||
req.header(reqwest::header::AUTHORIZATION, value.get_contents())
|
||||
} else {
|
||||
req
|
||||
};
|
||||
req.json(&body).send().await.map_err(Error::ReceiveBody)
|
||||
}
|
||||
}
|
||||
@@ -1,3 +1,4 @@
|
||||
pub mod client;
|
||||
pub mod routes;
|
||||
pub use routes::make_router;
|
||||
|
||||
|
||||
@@ -1,38 +1,25 @@
|
||||
use hyper::{Body, Request, Response, StatusCode, Uri};
|
||||
|
||||
use once_cell::sync::Lazy;
|
||||
use postgres_ffi::WAL_SEGMENT_SIZE;
|
||||
use safekeeper_api::models::{SkTimelineInfo, TimelineCopyRequest};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::fmt;
|
||||
use std::io::Write as _;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use storage_broker::proto::SafekeeperTimelineInfo;
|
||||
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
|
||||
use tokio::fs::File;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::task;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{info_span, Instrument};
|
||||
use utils::failpoint_support::failpoints_handler;
|
||||
use utils::http::endpoint::{prometheus_metrics_handler, request_span, ChannelWriter};
|
||||
use utils::http::request::parse_query_param;
|
||||
|
||||
use std::io::Write as _;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use tracing::{info_span, Instrument};
|
||||
use utils::http::endpoint::{prometheus_metrics_handler, request_span, ChannelWriter};
|
||||
|
||||
use crate::debug_dump::TimelineDigestRequest;
|
||||
use crate::receive_wal::WalReceiverState;
|
||||
use crate::safekeeper::Term;
|
||||
use crate::safekeeper::{ServerInfo, TermLsn};
|
||||
use crate::send_wal::WalSenderState;
|
||||
use crate::timeline::PeerInfo;
|
||||
use crate::{copy_timeline, debug_dump, patch_control_file, pull_timeline};
|
||||
|
||||
use crate::timelines_global_map::TimelineDeleteForceResult;
|
||||
use crate::GlobalTimelines;
|
||||
use crate::SafeKeeperConf;
|
||||
use postgres_ffi::WAL_SEGMENT_SIZE;
|
||||
use safekeeper_api::models::TimelineCreateRequest;
|
||||
use safekeeper_api::models::{SkTimelineInfo, TimelineCopyRequest};
|
||||
use utils::{
|
||||
auth::SwappableJwtAuth,
|
||||
http::{
|
||||
@@ -46,7 +33,16 @@ use utils::{
|
||||
lsn::Lsn,
|
||||
};
|
||||
|
||||
use super::models::TimelineCreateRequest;
|
||||
use crate::debug_dump::TimelineDigestRequest;
|
||||
use crate::receive_wal::WalReceiverState;
|
||||
use crate::safekeeper::Term;
|
||||
use crate::safekeeper::{ServerInfo, TermLsn};
|
||||
use crate::send_wal::WalSenderState;
|
||||
use crate::timeline::PeerInfo;
|
||||
use crate::timelines_global_map::TimelineDeleteForceResult;
|
||||
use crate::GlobalTimelines;
|
||||
use crate::SafeKeeperConf;
|
||||
use crate::{copy_timeline, debug_dump, patch_control_file, pull_timeline};
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct SafekeeperStatus {
|
||||
@@ -199,13 +195,50 @@ async fn timeline_pull_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
check_permission(&request, None)?;
|
||||
|
||||
let data: pull_timeline::Request = json_request(&mut request).await?;
|
||||
let conf = get_conf(&request);
|
||||
|
||||
let resp = pull_timeline::handle_request(data)
|
||||
let resp = pull_timeline::handle_request(data, conf.sk_auth_token.clone())
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
json_response(StatusCode::OK, resp)
|
||||
}
|
||||
|
||||
/// Stream tar archive with all timeline data.
|
||||
async fn timeline_snapshot_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let ttid = TenantTimelineId::new(
|
||||
parse_request_param(&request, "tenant_id")?,
|
||||
parse_request_param(&request, "timeline_id")?,
|
||||
);
|
||||
check_permission(&request, Some(ttid.tenant_id))?;
|
||||
|
||||
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
|
||||
// Note: with evicted timelines it should work better then de-evict them and
|
||||
// stream; probably start_snapshot would copy partial s3 file to dest path
|
||||
// and stream control file, or return FullAccessTimeline if timeline is not
|
||||
// evicted.
|
||||
let tli = tli
|
||||
.full_access_guard()
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
// To stream the body use wrap_stream which wants Stream of Result<Bytes>,
|
||||
// so create the chan and write to it in another task.
|
||||
let (tx, rx) = mpsc::channel(1);
|
||||
|
||||
task::spawn(pull_timeline::stream_snapshot(tli, tx));
|
||||
|
||||
let rx_stream = ReceiverStream::new(rx);
|
||||
let body = Body::wrap_stream(rx_stream);
|
||||
|
||||
let response = Response::builder()
|
||||
.status(200)
|
||||
.header(hyper::header::CONTENT_TYPE, "application/octet-stream")
|
||||
.body(body)
|
||||
.unwrap();
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
async fn timeline_copy_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permission(&request, None)?;
|
||||
|
||||
@@ -260,41 +293,6 @@ async fn timeline_digest_handler(request: Request<Body>) -> Result<Response<Body
|
||||
json_response(StatusCode::OK, response)
|
||||
}
|
||||
|
||||
/// Download a file from the timeline directory.
|
||||
// TODO: figure out a better way to copy files between safekeepers
|
||||
async fn timeline_files_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let ttid = TenantTimelineId::new(
|
||||
parse_request_param(&request, "tenant_id")?,
|
||||
parse_request_param(&request, "timeline_id")?,
|
||||
);
|
||||
check_permission(&request, Some(ttid.tenant_id))?;
|
||||
|
||||
let filename: String = parse_request_param(&request, "filename")?;
|
||||
|
||||
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
|
||||
let tli = tli
|
||||
.full_access_guard()
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
let filepath = tli.get_timeline_dir().join(filename);
|
||||
let mut file = File::open(&filepath)
|
||||
.await
|
||||
.map_err(|e| ApiError::InternalServerError(e.into()))?;
|
||||
|
||||
let mut content = Vec::new();
|
||||
// TODO: don't store files in memory
|
||||
file.read_to_end(&mut content)
|
||||
.await
|
||||
.map_err(|e| ApiError::InternalServerError(e.into()))?;
|
||||
|
||||
Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.header("Content-Type", "application/octet-stream")
|
||||
.body(Body::from(content))
|
||||
.map_err(|e| ApiError::InternalServerError(e.into()))
|
||||
}
|
||||
|
||||
/// Force persist control file.
|
||||
async fn timeline_checkpoint_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permission(&request, None)?;
|
||||
@@ -566,13 +564,13 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
|
||||
.delete("/v1/tenant/:tenant_id", |r| {
|
||||
request_span(r, tenant_delete_handler)
|
||||
})
|
||||
.get(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/snapshot",
|
||||
|r| request_span(r, timeline_snapshot_handler),
|
||||
)
|
||||
.post("/v1/pull_timeline", |r| {
|
||||
request_span(r, timeline_pull_handler)
|
||||
})
|
||||
.get(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/file/:filename",
|
||||
|r| request_span(r, timeline_files_handler),
|
||||
)
|
||||
.post(
|
||||
"/v1/tenant/:tenant_id/timeline/:source_timeline_id/copy",
|
||||
|r| request_span(r, timeline_copy_handler),
|
||||
|
||||
@@ -7,7 +7,7 @@ use tokio::runtime::Runtime;
|
||||
use std::time::Duration;
|
||||
use storage_broker::Uri;
|
||||
|
||||
use utils::{auth::SwappableJwtAuth, id::NodeId};
|
||||
use utils::{auth::SwappableJwtAuth, id::NodeId, logging::SecretString};
|
||||
|
||||
mod auth;
|
||||
pub mod broker;
|
||||
@@ -78,6 +78,8 @@ pub struct SafeKeeperConf {
|
||||
pub pg_auth: Option<Arc<JwtAuth>>,
|
||||
pub pg_tenant_only_auth: Option<Arc<JwtAuth>>,
|
||||
pub http_auth: Option<Arc<SwappableJwtAuth>>,
|
||||
/// JWT token to connect to other safekeepers with.
|
||||
pub sk_auth_token: Option<SecretString>,
|
||||
pub current_thread_runtime: bool,
|
||||
pub walsenders_keep_horizon: bool,
|
||||
pub partial_backup_enabled: bool,
|
||||
@@ -114,6 +116,7 @@ impl SafeKeeperConf {
|
||||
pg_auth: None,
|
||||
pg_tenant_only_auth: None,
|
||||
http_auth: None,
|
||||
sk_auth_token: None,
|
||||
heartbeat_timeout: Duration::new(5, 0),
|
||||
max_offloader_lag_bytes: defaults::DEFAULT_MAX_OFFLOADER_LAG_BYTES,
|
||||
current_thread_runtime: false,
|
||||
|
||||
@@ -1,28 +1,244 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::{anyhow, bail, Context, Result};
|
||||
use bytes::Bytes;
|
||||
use camino::Utf8PathBuf;
|
||||
use camino_tempfile::Utf8TempDir;
|
||||
use chrono::{DateTime, Utc};
|
||||
use futures::{SinkExt, StreamExt, TryStreamExt};
|
||||
use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{
|
||||
cmp::min,
|
||||
io::{self, ErrorKind},
|
||||
sync::Arc,
|
||||
};
|
||||
use tokio::{
|
||||
fs::{File, OpenOptions},
|
||||
io::AsyncWrite,
|
||||
sync::mpsc,
|
||||
task,
|
||||
};
|
||||
use tokio_tar::{Archive, Builder};
|
||||
use tokio_util::{
|
||||
io::{CopyToBytes, SinkWriter},
|
||||
sync::PollSender,
|
||||
};
|
||||
use tracing::{error, info, instrument};
|
||||
|
||||
use anyhow::{bail, Context, Result};
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tracing::info;
|
||||
use crate::{
|
||||
control_file::{self, CONTROL_FILE_NAME},
|
||||
debug_dump,
|
||||
http::{
|
||||
client::{self, Client},
|
||||
routes::TimelineStatus,
|
||||
},
|
||||
safekeeper::Term,
|
||||
timeline::{get_tenant_dir, get_timeline_dir, FullAccessTimeline, Timeline, TimelineError},
|
||||
wal_storage::{self, open_wal_file, Storage},
|
||||
GlobalTimelines, SafeKeeperConf,
|
||||
};
|
||||
use utils::{
|
||||
crashsafe::{durable_rename, fsync_async_opt},
|
||||
id::{TenantId, TenantTimelineId, TimelineId},
|
||||
logging::SecretString,
|
||||
lsn::Lsn,
|
||||
pausable_failpoint,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
control_file, debug_dump,
|
||||
http::routes::TimelineStatus,
|
||||
timeline::{get_tenant_dir, get_timeline_dir, Timeline, TimelineError},
|
||||
wal_storage::{self, Storage},
|
||||
GlobalTimelines, SafeKeeperConf,
|
||||
};
|
||||
/// Stream tar archive of timeline to tx.
|
||||
#[instrument(name = "snapshot", skip_all, fields(ttid = %tli.ttid))]
|
||||
pub async fn stream_snapshot(tli: FullAccessTimeline, tx: mpsc::Sender<Result<Bytes>>) {
|
||||
if let Err(e) = stream_snapshot_guts(tli, tx.clone()).await {
|
||||
// Error type/contents don't matter as they won't can't reach the client
|
||||
// (hyper likely doesn't do anything with it), but http stream will be
|
||||
// prematurely terminated. It would be nice to try to send the error in
|
||||
// trailers though.
|
||||
tx.send(Err(anyhow!("snapshot failed"))).await.ok();
|
||||
error!("snapshot failed: {:#}", e);
|
||||
}
|
||||
}
|
||||
|
||||
/// Info about timeline on safekeeper ready for reporting.
|
||||
/// State needed while streaming the snapshot.
|
||||
pub struct SnapshotContext {
|
||||
pub from_segno: XLogSegNo, // including
|
||||
pub upto_segno: XLogSegNo, // including
|
||||
pub term: Term,
|
||||
pub last_log_term: Term,
|
||||
pub flush_lsn: Lsn,
|
||||
pub wal_seg_size: usize,
|
||||
// used to remove WAL hold off in Drop.
|
||||
pub tli: FullAccessTimeline,
|
||||
}
|
||||
|
||||
impl Drop for SnapshotContext {
|
||||
fn drop(&mut self) {
|
||||
let tli = self.tli.clone();
|
||||
task::spawn(async move {
|
||||
let mut shared_state = tli.write_shared_state().await;
|
||||
shared_state.wal_removal_on_hold = false;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn stream_snapshot_guts(
|
||||
tli: FullAccessTimeline,
|
||||
tx: mpsc::Sender<Result<Bytes>>,
|
||||
) -> Result<()> {
|
||||
// tokio-tar wants Write implementor, but we have mpsc tx <Result<Bytes>>;
|
||||
// use SinkWriter as a Write impl. That is,
|
||||
// - create Sink from the tx. It returns PollSendError if chan is closed.
|
||||
let sink = PollSender::new(tx);
|
||||
// - SinkWriter needs sink error to be io one, map it.
|
||||
let sink_io_err = sink.sink_map_err(|_| io::Error::from(ErrorKind::BrokenPipe));
|
||||
// - SinkWriter wants sink type to be just Bytes, not Result<Bytes>, so map
|
||||
// it with with(). Note that with() accepts async function which we don't
|
||||
// need and allows the map to fail, which we don't need either, but hence
|
||||
// two Oks.
|
||||
let oksink = sink_io_err.with(|b: Bytes| async { io::Result::Ok(Result::Ok(b)) });
|
||||
// - SinkWriter (not surprisingly) wants sink of &[u8], not bytes, so wrap
|
||||
// into CopyToBytes. This is a data copy.
|
||||
let copy_to_bytes = CopyToBytes::new(oksink);
|
||||
let mut writer = SinkWriter::new(copy_to_bytes);
|
||||
let pinned_writer = std::pin::pin!(writer);
|
||||
|
||||
// Note that tokio_tar append_* funcs use tokio::io::copy with 8KB buffer
|
||||
// which is also likely suboptimal.
|
||||
let mut ar = Builder::new_non_terminated(pinned_writer);
|
||||
|
||||
let bctx = tli.start_snapshot(&mut ar).await?;
|
||||
pausable_failpoint!("sk-snapshot-after-list-pausable");
|
||||
|
||||
let tli_dir = tli.get_timeline_dir();
|
||||
info!(
|
||||
"sending {} segments [{:#X}-{:#X}], term={}, last_log_term={}, flush_lsn={}",
|
||||
bctx.upto_segno - bctx.from_segno + 1,
|
||||
bctx.from_segno,
|
||||
bctx.upto_segno,
|
||||
bctx.term,
|
||||
bctx.last_log_term,
|
||||
bctx.flush_lsn,
|
||||
);
|
||||
for segno in bctx.from_segno..=bctx.upto_segno {
|
||||
let (mut sf, is_partial) = open_wal_file(&tli_dir, segno, bctx.wal_seg_size).await?;
|
||||
let mut wal_file_name = XLogFileName(PG_TLI, segno, bctx.wal_seg_size);
|
||||
if is_partial {
|
||||
wal_file_name.push_str(".partial");
|
||||
}
|
||||
ar.append_file(&wal_file_name, &mut sf).await?;
|
||||
}
|
||||
|
||||
// Do the term check before ar.finish to make archive corrupted in case of
|
||||
// term change. Client shouldn't ignore abrupt stream end, but to be sure.
|
||||
tli.finish_snapshot(&bctx).await?;
|
||||
|
||||
ar.finish().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
impl FullAccessTimeline {
|
||||
/// Start streaming tar archive with timeline:
|
||||
/// 1) stream control file under lock;
|
||||
/// 2) hold off WAL removal;
|
||||
/// 3) collect SnapshotContext to understand which WAL segments should be
|
||||
/// streamed.
|
||||
///
|
||||
/// Snapshot streams data up to flush_lsn. To make this safe, we must check
|
||||
/// that term doesn't change during the procedure, or we risk sending mix of
|
||||
/// WAL from different histories. Term is remembered in the SnapshotContext
|
||||
/// and checked in finish_snapshot. Note that in the last segment some WAL
|
||||
/// higher than flush_lsn set here might be streamed; that's fine as long as
|
||||
/// terms doesn't change.
|
||||
///
|
||||
/// Alternatively we could send only up to commit_lsn to get some valid
|
||||
/// state which later will be recovered by compute, in this case term check
|
||||
/// is not needed, but we likely don't want that as there might be no
|
||||
/// compute which could perform the recovery.
|
||||
///
|
||||
/// When returned SnapshotContext is dropped WAL hold is removed.
|
||||
async fn start_snapshot<W: AsyncWrite + Unpin + Send>(
|
||||
&self,
|
||||
ar: &mut tokio_tar::Builder<W>,
|
||||
) -> Result<SnapshotContext> {
|
||||
let mut shared_state = self.write_shared_state().await;
|
||||
|
||||
let cf_path = self.get_timeline_dir().join(CONTROL_FILE_NAME);
|
||||
let mut cf = File::open(cf_path).await?;
|
||||
ar.append_file(CONTROL_FILE_NAME, &mut cf).await?;
|
||||
|
||||
// We need to stream since the oldest segment someone (s3 or pageserver)
|
||||
// still needs. This duplicates calc_horizon_lsn logic.
|
||||
//
|
||||
// We know that WAL wasn't removed up to this point because it cannot be
|
||||
// removed further than `backup_lsn`. Since we're holding shared_state
|
||||
// lock and setting `wal_removal_on_hold` later, it guarantees that WAL
|
||||
// won't be removed until we're done.
|
||||
let from_lsn = min(
|
||||
shared_state.sk.state.remote_consistent_lsn,
|
||||
shared_state.sk.state.backup_lsn,
|
||||
);
|
||||
if from_lsn == Lsn::INVALID {
|
||||
// this is possible if snapshot is called before handling first
|
||||
// elected message
|
||||
bail!("snapshot is called on uninitialized timeline");
|
||||
}
|
||||
let from_segno = from_lsn.segment_number(shared_state.get_wal_seg_size());
|
||||
let term = shared_state.sk.get_term();
|
||||
let last_log_term = shared_state.sk.get_last_log_term();
|
||||
let flush_lsn = shared_state.sk.flush_lsn();
|
||||
let upto_segno = flush_lsn.segment_number(shared_state.get_wal_seg_size());
|
||||
// have some limit on max number of segments as a sanity check
|
||||
const MAX_ALLOWED_SEGS: u64 = 1000;
|
||||
let num_segs = upto_segno - from_segno + 1;
|
||||
if num_segs > MAX_ALLOWED_SEGS {
|
||||
bail!(
|
||||
"snapshot is called on timeline with {} segments, but the limit is {}",
|
||||
num_segs,
|
||||
MAX_ALLOWED_SEGS
|
||||
);
|
||||
}
|
||||
|
||||
// Prevent WAL removal while we're streaming data.
|
||||
//
|
||||
// Since this a flag, not a counter just bail out if already set; we
|
||||
// shouldn't need concurrent snapshotting.
|
||||
if shared_state.wal_removal_on_hold {
|
||||
bail!("wal_removal_on_hold is already true");
|
||||
}
|
||||
shared_state.wal_removal_on_hold = true;
|
||||
|
||||
let bctx = SnapshotContext {
|
||||
from_segno,
|
||||
upto_segno,
|
||||
term,
|
||||
last_log_term,
|
||||
flush_lsn,
|
||||
wal_seg_size: shared_state.get_wal_seg_size(),
|
||||
tli: self.clone(),
|
||||
};
|
||||
|
||||
Ok(bctx)
|
||||
}
|
||||
|
||||
/// Finish snapshotting: check that term(s) hasn't changed.
|
||||
///
|
||||
/// Note that WAL gc hold off is removed in Drop of SnapshotContext to not
|
||||
/// forget this if snapshotting fails mid the way.
|
||||
pub async fn finish_snapshot(&self, bctx: &SnapshotContext) -> Result<()> {
|
||||
let shared_state = self.read_shared_state().await;
|
||||
let term = shared_state.sk.get_term();
|
||||
let last_log_term = shared_state.sk.get_last_log_term();
|
||||
// There are some cases to relax this check (e.g. last_log_term might
|
||||
// change, but as long as older history is strictly part of new that's
|
||||
// fine), but there is no need to do it.
|
||||
if bctx.term != term || bctx.last_log_term != last_log_term {
|
||||
bail!("term(s) changed during snapshot: were term={}, last_log_term={}, now term={}, last_log_term={}",
|
||||
bctx.term, bctx.last_log_term, term, last_log_term);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// pull_timeline request body.
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct Request {
|
||||
pub tenant_id: TenantId,
|
||||
@@ -48,7 +264,10 @@ pub struct DebugDumpResponse {
|
||||
}
|
||||
|
||||
/// Find the most advanced safekeeper and pull timeline from it.
|
||||
pub async fn handle_request(request: Request) -> Result<Response> {
|
||||
pub async fn handle_request(
|
||||
request: Request,
|
||||
sk_auth_token: Option<SecretString>,
|
||||
) -> Result<Response> {
|
||||
let existing_tli = GlobalTimelines::get(TenantTimelineId::new(
|
||||
request.tenant_id,
|
||||
request.timeline_id,
|
||||
@@ -57,28 +276,26 @@ pub async fn handle_request(request: Request) -> Result<Response> {
|
||||
bail!("Timeline {} already exists", request.timeline_id);
|
||||
}
|
||||
|
||||
let client = reqwest::Client::new();
|
||||
let http_hosts = request.http_hosts.clone();
|
||||
|
||||
// Send request to /v1/tenant/:tenant_id/timeline/:timeline_id
|
||||
let responses = futures::future::join_all(http_hosts.iter().map(|url| {
|
||||
let url = format!(
|
||||
"{}/v1/tenant/{}/timeline/{}",
|
||||
url, request.tenant_id, request.timeline_id
|
||||
);
|
||||
client.get(url).send()
|
||||
}))
|
||||
.await;
|
||||
// Figure out statuses of potential donors.
|
||||
let responses: Vec<Result<TimelineStatus, client::Error>> =
|
||||
futures::future::join_all(http_hosts.iter().map(|url| async {
|
||||
let cclient = Client::new(url.clone(), sk_auth_token.clone());
|
||||
let info = cclient
|
||||
.timeline_status(request.tenant_id, request.timeline_id)
|
||||
.await?;
|
||||
Ok(info)
|
||||
}))
|
||||
.await;
|
||||
|
||||
let mut statuses = Vec::new();
|
||||
for (i, response) in responses.into_iter().enumerate() {
|
||||
let response = response.context(format!("Failed to get status from {}", http_hosts[i]))?;
|
||||
let status: crate::http::routes::TimelineStatus = response.json().await?;
|
||||
let status = response.context(format!("fetching status from {}", http_hosts[i]))?;
|
||||
statuses.push((status, i));
|
||||
}
|
||||
|
||||
// Find the most advanced safekeeper
|
||||
// TODO: current logic may be wrong, fix it later
|
||||
let (status, i) = statuses
|
||||
.into_iter()
|
||||
.max_by_key(|(status, _)| {
|
||||
@@ -94,10 +311,14 @@ pub async fn handle_request(request: Request) -> Result<Response> {
|
||||
assert!(status.tenant_id == request.tenant_id);
|
||||
assert!(status.timeline_id == request.timeline_id);
|
||||
|
||||
pull_timeline(status, safekeeper_host).await
|
||||
pull_timeline(status, safekeeper_host, sk_auth_token).await
|
||||
}
|
||||
|
||||
async fn pull_timeline(status: TimelineStatus, host: String) -> Result<Response> {
|
||||
async fn pull_timeline(
|
||||
status: TimelineStatus,
|
||||
host: String,
|
||||
sk_auth_token: Option<SecretString>,
|
||||
) -> Result<Response> {
|
||||
let ttid = TenantTimelineId::new(status.tenant_id, status.timeline_id);
|
||||
info!(
|
||||
"pulling timeline {} from safekeeper {}, commit_lsn={}, flush_lsn={}, term={}, epoch={}",
|
||||
@@ -111,95 +332,53 @@ async fn pull_timeline(status: TimelineStatus, host: String) -> Result<Response>
|
||||
|
||||
let conf = &GlobalTimelines::get_global_config();
|
||||
|
||||
let client = reqwest::Client::new();
|
||||
// TODO: don't use debug dump, it should be used only in tests.
|
||||
// This is a proof of concept, we should figure out a way
|
||||
// to use scp without implementing it manually.
|
||||
|
||||
// Implementing our own scp over HTTP.
|
||||
// At first, we need to fetch list of files from safekeeper.
|
||||
let dump: DebugDumpResponse = client
|
||||
.get(format!(
|
||||
"{}/v1/debug_dump?dump_all=true&tenant_id={}&timeline_id={}",
|
||||
host, status.tenant_id, status.timeline_id
|
||||
))
|
||||
.send()
|
||||
.await?
|
||||
.json()
|
||||
.await?;
|
||||
|
||||
if dump.timelines.len() != 1 {
|
||||
bail!(
|
||||
"expected to fetch single timeline, got {} timelines",
|
||||
dump.timelines.len()
|
||||
);
|
||||
}
|
||||
|
||||
let timeline = dump.timelines.into_iter().next().unwrap();
|
||||
let disk_content = timeline.disk_content.ok_or(anyhow::anyhow!(
|
||||
"timeline {} doesn't have disk content",
|
||||
ttid
|
||||
))?;
|
||||
|
||||
let mut filenames = disk_content
|
||||
.files
|
||||
.iter()
|
||||
.map(|file| file.name.clone())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Sort filenames to make sure we pull files in correct order
|
||||
// After sorting, we should have:
|
||||
// - 000000010000000000000001
|
||||
// - ...
|
||||
// - 000000010000000000000002.partial
|
||||
// - safekeeper.control
|
||||
filenames.sort();
|
||||
|
||||
// safekeeper.control should be the first file, so we need to move it to the beginning
|
||||
let control_file_index = filenames
|
||||
.iter()
|
||||
.position(|name| name == "safekeeper.control")
|
||||
.ok_or(anyhow::anyhow!("safekeeper.control not found"))?;
|
||||
filenames.remove(control_file_index);
|
||||
filenames.insert(0, "safekeeper.control".to_string());
|
||||
|
||||
pausable_failpoint!("sk-pull-timeline-after-list-pausable");
|
||||
|
||||
info!(
|
||||
"downloading {} files from safekeeper {}",
|
||||
filenames.len(),
|
||||
host
|
||||
);
|
||||
|
||||
let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, ttid).await?;
|
||||
|
||||
// Note: some time happens between fetching list of files and fetching files themselves.
|
||||
// It's possible that some files will be removed from safekeeper and we will fail to fetch them.
|
||||
// This function will fail in this case, should be retried by the caller.
|
||||
for filename in filenames {
|
||||
let file_path = tli_dir_path.join(&filename);
|
||||
// /v1/tenant/:tenant_id/timeline/:timeline_id/file/:filename
|
||||
let http_url = format!(
|
||||
"{}/v1/tenant/{}/timeline/{}/file/{}",
|
||||
host, status.tenant_id, status.timeline_id, filename
|
||||
);
|
||||
let client = Client::new(host.clone(), sk_auth_token.clone());
|
||||
// Request stream with basebackup archive.
|
||||
let bb_resp = client
|
||||
.snapshot(status.tenant_id, status.timeline_id)
|
||||
.await?;
|
||||
|
||||
let mut file = tokio::fs::File::create(&file_path).await?;
|
||||
let mut response = client.get(&http_url).send().await?;
|
||||
if response.status() != reqwest::StatusCode::OK {
|
||||
bail!(
|
||||
"pulling file {} failed: status is {}",
|
||||
filename,
|
||||
response.status()
|
||||
);
|
||||
}
|
||||
while let Some(chunk) = response.chunk().await? {
|
||||
file.write_all(&chunk).await?;
|
||||
file.flush().await?;
|
||||
// Make Stream of Bytes from it...
|
||||
let bb_stream = bb_resp.bytes_stream().map_err(std::io::Error::other);
|
||||
// and turn it into StreamReader implementing AsyncRead.
|
||||
let bb_reader = tokio_util::io::StreamReader::new(bb_stream);
|
||||
|
||||
// Extract it on the fly to the disk. We don't use simple unpack() to fsync
|
||||
// files.
|
||||
let mut entries = Archive::new(bb_reader).entries()?;
|
||||
while let Some(base_tar_entry) = entries.next().await {
|
||||
let mut entry = base_tar_entry?;
|
||||
let header = entry.header();
|
||||
let file_path = header.path()?.into_owned();
|
||||
match header.entry_type() {
|
||||
tokio_tar::EntryType::Regular => {
|
||||
let utf8_file_path =
|
||||
Utf8PathBuf::from_path_buf(file_path).expect("non-Unicode path");
|
||||
let dst_path = tli_dir_path.join(utf8_file_path);
|
||||
let mut f = OpenOptions::new()
|
||||
.create(true)
|
||||
.truncate(true)
|
||||
.write(true)
|
||||
.open(&dst_path)
|
||||
.await?;
|
||||
tokio::io::copy(&mut entry, &mut f).await?;
|
||||
// fsync the file
|
||||
f.sync_all().await?;
|
||||
}
|
||||
_ => {
|
||||
bail!(
|
||||
"entry {} in backup tar archive is of unexpected type: {:?}",
|
||||
file_path.display(),
|
||||
header.entry_type()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: fsync?
|
||||
// fsync temp timeline directory to remember its contents.
|
||||
fsync_async_opt(&tli_dir_path, !conf.no_sync).await?;
|
||||
|
||||
// Let's create timeline from temp directory and verify that it's correct
|
||||
let (commit_lsn, flush_lsn) = validate_temp_timeline(conf, ttid, &tli_dir_path).await?;
|
||||
@@ -290,7 +469,9 @@ pub async fn load_temp_timeline(
|
||||
ttid, tmp_path, timeline_path
|
||||
);
|
||||
tokio::fs::create_dir_all(get_tenant_dir(conf, &ttid.tenant_id)).await?;
|
||||
tokio::fs::rename(tmp_path, &timeline_path).await?;
|
||||
// fsync tenant dir creation
|
||||
fsync_async_opt(&conf.workdir, !conf.no_sync).await?;
|
||||
durable_rename(tmp_path, &timeline_path, !conf.no_sync).await?;
|
||||
|
||||
let tli = GlobalTimelines::load_timeline(&guard, ttid)
|
||||
.await
|
||||
|
||||
@@ -780,6 +780,9 @@ where
|
||||
|
||||
// Initializing backup_lsn is useful to avoid making backup think it should upload 0 segment.
|
||||
state.backup_lsn = max(state.backup_lsn, state.timeline_start_lsn);
|
||||
// similar for remote_consistent_lsn
|
||||
state.remote_consistent_lsn =
|
||||
max(state.remote_consistent_lsn, state.timeline_start_lsn);
|
||||
|
||||
state.acceptor_state.term_history = msg.term_history.clone();
|
||||
self.state.finish_change(&state).await?;
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
use anyhow::{anyhow, bail, Result};
|
||||
use camino::Utf8PathBuf;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::fs;
|
||||
use tokio::fs::{self};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::id::TenantId;
|
||||
|
||||
@@ -168,6 +168,9 @@ pub struct SharedState {
|
||||
pub(crate) sk: SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>,
|
||||
/// In memory list containing state of peers sent in latest messages from them.
|
||||
pub(crate) peers_info: PeersInfo,
|
||||
// True value hinders old WAL removal; this is used by snapshotting. We
|
||||
// could make it a counter, but there is no need to.
|
||||
pub(crate) wal_removal_on_hold: bool,
|
||||
}
|
||||
|
||||
impl SharedState {
|
||||
@@ -205,6 +208,7 @@ impl SharedState {
|
||||
Ok(Self {
|
||||
sk,
|
||||
peers_info: PeersInfo(vec![]),
|
||||
wal_removal_on_hold: false,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -222,10 +226,11 @@ impl SharedState {
|
||||
Ok(Self {
|
||||
sk: SafeKeeper::new(control_store, wal_store, conf.my_id)?,
|
||||
peers_info: PeersInfo(vec![]),
|
||||
wal_removal_on_hold: false,
|
||||
})
|
||||
}
|
||||
|
||||
fn get_wal_seg_size(&self) -> usize {
|
||||
pub(crate) fn get_wal_seg_size(&self) -> usize {
|
||||
self.sk.state.server.wal_seg_size as usize
|
||||
}
|
||||
|
||||
|
||||
@@ -39,6 +39,7 @@ pub struct StateSnapshot {
|
||||
// misc
|
||||
pub cfile_last_persist_at: Instant,
|
||||
pub inmem_flush_pending: bool,
|
||||
pub wal_removal_on_hold: bool,
|
||||
pub peers: Vec<PeerInfo>,
|
||||
}
|
||||
|
||||
@@ -54,6 +55,7 @@ impl StateSnapshot {
|
||||
cfile_backup_lsn: read_guard.sk.state.backup_lsn,
|
||||
cfile_last_persist_at: read_guard.sk.state.pers.last_persist_at(),
|
||||
inmem_flush_pending: Self::has_unflushed_inmem_state(&read_guard),
|
||||
wal_removal_on_hold: read_guard.wal_removal_on_hold,
|
||||
peers: read_guard.get_peers(heartbeat_timeout),
|
||||
}
|
||||
}
|
||||
@@ -324,8 +326,8 @@ async fn update_wal_removal(
|
||||
last_removed_segno: u64,
|
||||
wal_removal_task: &mut Option<JoinHandle<anyhow::Result<u64>>>,
|
||||
) {
|
||||
if wal_removal_task.is_some() {
|
||||
// WAL removal is already in progress
|
||||
if wal_removal_task.is_some() || state.wal_removal_on_hold {
|
||||
// WAL removal is already in progress or hold off
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
@@ -684,13 +684,12 @@ impl WalReader {
|
||||
let xlogoff = self.pos.segment_offset(self.wal_seg_size);
|
||||
let segno = self.pos.segment_number(self.wal_seg_size);
|
||||
let wal_file_name = XLogFileName(PG_TLI, segno, self.wal_seg_size);
|
||||
let wal_file_path = self.timeline_dir.join(&wal_file_name);
|
||||
|
||||
// Try to open local file, if we may have WAL locally
|
||||
if self.pos >= self.local_start_lsn {
|
||||
let res = Self::open_wal_file(&wal_file_path).await;
|
||||
let res = open_wal_file(&self.timeline_dir, segno, self.wal_seg_size).await;
|
||||
match res {
|
||||
Ok(mut file) => {
|
||||
Ok((mut file, _)) => {
|
||||
file.seek(SeekFrom::Start(xlogoff as u64)).await?;
|
||||
return Ok(Box::pin(file));
|
||||
}
|
||||
@@ -718,25 +717,6 @@ impl WalReader {
|
||||
|
||||
bail!("WAL segment is not found")
|
||||
}
|
||||
|
||||
/// Helper function for opening a wal file.
|
||||
async fn open_wal_file(wal_file_path: &Utf8Path) -> Result<tokio::fs::File> {
|
||||
// First try to open the .partial file.
|
||||
let mut partial_path = wal_file_path.to_owned();
|
||||
partial_path.set_extension("partial");
|
||||
if let Ok(opened_file) = tokio::fs::File::open(&partial_path).await {
|
||||
return Ok(opened_file);
|
||||
}
|
||||
|
||||
// If that failed, try it without the .partial extension.
|
||||
tokio::fs::File::open(&wal_file_path)
|
||||
.await
|
||||
.with_context(|| format!("Failed to open WAL file {:?}", wal_file_path))
|
||||
.map_err(|e| {
|
||||
warn!("{}", e);
|
||||
e
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Zero block for filling created WAL segments.
|
||||
@@ -758,6 +738,34 @@ async fn write_zeroes(file: &mut File, mut count: usize) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Helper function for opening WAL segment `segno` in `dir`. Returns file and
|
||||
/// whether it is .partial.
|
||||
pub(crate) async fn open_wal_file(
|
||||
timeline_dir: &Utf8Path,
|
||||
segno: XLogSegNo,
|
||||
wal_seg_size: usize,
|
||||
) -> Result<(tokio::fs::File, bool)> {
|
||||
let (wal_file_path, wal_file_partial_path) = wal_file_paths(timeline_dir, segno, wal_seg_size)?;
|
||||
|
||||
// First try to open the .partial file.
|
||||
let mut partial_path = wal_file_path.to_owned();
|
||||
partial_path.set_extension("partial");
|
||||
if let Ok(opened_file) = tokio::fs::File::open(&wal_file_partial_path).await {
|
||||
return Ok((opened_file, true));
|
||||
}
|
||||
|
||||
// If that failed, try it without the .partial extension.
|
||||
let pf = tokio::fs::File::open(&wal_file_path)
|
||||
.await
|
||||
.with_context(|| format!("failed to open WAL file {:#}", wal_file_path))
|
||||
.map_err(|e| {
|
||||
warn!("{}", e);
|
||||
e
|
||||
})?;
|
||||
|
||||
Ok((pf, false))
|
||||
}
|
||||
|
||||
/// Helper returning full path to WAL segment file and its .partial brother.
|
||||
pub fn wal_file_paths(
|
||||
timeline_dir: &Utf8Path,
|
||||
|
||||
@@ -174,6 +174,7 @@ pub fn run_server(os: NodeOs, disk: Arc<SafekeeperDisk>) -> Result<()> {
|
||||
pg_auth: None,
|
||||
pg_tenant_only_auth: None,
|
||||
http_auth: None,
|
||||
sk_auth_token: None,
|
||||
current_thread_runtime: false,
|
||||
walsenders_keep_horizon: false,
|
||||
partial_backup_enabled: false,
|
||||
|
||||
@@ -314,7 +314,7 @@ impl ComputeHook {
|
||||
if endpoint.tenant_id == *tenant_id && endpoint.status() == EndpointStatus::Running {
|
||||
tracing::info!("Reconfiguring endpoint {}", endpoint_name,);
|
||||
endpoint
|
||||
.reconfigure(compute_pageservers.clone(), *stripe_size)
|
||||
.reconfigure(compute_pageservers.clone(), *stripe_size, None)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1914,6 +1914,7 @@ class NeonCli(AbstractNeonCli):
|
||||
endpoint_id: str,
|
||||
tenant_id: Optional[TenantId] = None,
|
||||
pageserver_id: Optional[int] = None,
|
||||
safekeepers: Optional[List[int]] = None,
|
||||
check_return_code=True,
|
||||
) -> "subprocess.CompletedProcess[str]":
|
||||
args = ["endpoint", "reconfigure", endpoint_id]
|
||||
@@ -1921,6 +1922,8 @@ class NeonCli(AbstractNeonCli):
|
||||
args.extend(["--tenant-id", str(tenant_id)])
|
||||
if pageserver_id is not None:
|
||||
args.extend(["--pageserver-id", str(pageserver_id)])
|
||||
if safekeepers is not None:
|
||||
args.extend(["--safekeepers", (",".join(map(str, safekeepers)))])
|
||||
return self.raw_cli(args, check_return_code=check_return_code)
|
||||
|
||||
def endpoint_stop(
|
||||
@@ -3407,6 +3410,7 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
self.pg_port = pg_port
|
||||
self.http_port = http_port
|
||||
self.check_stop_result = check_stop_result
|
||||
# passed to endpoint create and endpoint reconfigure
|
||||
self.active_safekeepers: List[int] = list(map(lambda sk: sk.id, env.safekeepers))
|
||||
# path to conf is <repo_dir>/endpoints/<endpoint_id>/pgdata/postgresql.conf
|
||||
|
||||
@@ -3469,6 +3473,7 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
self,
|
||||
remote_ext_config: Optional[str] = None,
|
||||
pageserver_id: Optional[int] = None,
|
||||
safekeepers: Optional[List[int]] = None,
|
||||
allow_multiple: bool = False,
|
||||
) -> "Endpoint":
|
||||
"""
|
||||
@@ -3478,6 +3483,11 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
|
||||
assert self.endpoint_id is not None
|
||||
|
||||
# If `safekeepers` is not None, they are remember them as active and use
|
||||
# in the following commands.
|
||||
if safekeepers is not None:
|
||||
self.active_safekeepers = safekeepers
|
||||
|
||||
log.info(f"Starting postgres endpoint {self.endpoint_id}")
|
||||
|
||||
self.env.neon_cli.endpoint_start(
|
||||
@@ -3538,9 +3548,17 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
if self.running:
|
||||
self.safe_psql("SELECT pg_reload_conf()")
|
||||
|
||||
def reconfigure(self, pageserver_id: Optional[int] = None):
|
||||
def reconfigure(
|
||||
self, pageserver_id: Optional[int] = None, safekeepers: Optional[List[int]] = None
|
||||
):
|
||||
assert self.endpoint_id is not None
|
||||
self.env.neon_cli.endpoint_reconfigure(self.endpoint_id, self.tenant_id, pageserver_id)
|
||||
# If `safekeepers` is not None, they are remember them as active and use
|
||||
# in the following commands.
|
||||
if safekeepers is not None:
|
||||
self.active_safekeepers = safekeepers
|
||||
self.env.neon_cli.endpoint_reconfigure(
|
||||
self.endpoint_id, self.tenant_id, pageserver_id, self.active_safekeepers
|
||||
)
|
||||
|
||||
def respec(self, **kwargs):
|
||||
"""Update the endpoint.json file used by control_plane."""
|
||||
@@ -3847,7 +3865,15 @@ class Safekeeper(LogUtils):
|
||||
assert isinstance(res, dict)
|
||||
return res
|
||||
|
||||
def http_client(self, auth_token: Optional[str] = None) -> SafekeeperHttpClient:
|
||||
def http_client(
|
||||
self, auth_token: Optional[str] = None, gen_sk_wide_token: bool = True
|
||||
) -> SafekeeperHttpClient:
|
||||
"""
|
||||
When auth_token is None but gen_sk_wide is True creates safekeeper wide
|
||||
token, which is a reasonable default.
|
||||
"""
|
||||
if auth_token is None and gen_sk_wide_token:
|
||||
auth_token = self.env.auth_keys.generate_safekeeper_token()
|
||||
is_testing_enabled = '"testing"' in self.env.get_binary_version("safekeeper")
|
||||
return SafekeeperHttpClient(
|
||||
port=self.port.http, auth_token=auth_token, is_testing_enabled=is_testing_enabled
|
||||
@@ -3897,11 +3923,13 @@ class Safekeeper(LogUtils):
|
||||
segments.sort()
|
||||
return segments
|
||||
|
||||
def checkpoint_up_to(self, tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn):
|
||||
def checkpoint_up_to(
|
||||
self, tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn, wait_wal_removal=True
|
||||
):
|
||||
"""
|
||||
Assuming pageserver(s) uploaded to s3 up to `lsn`,
|
||||
1) wait for remote_consistent_lsn and wal_backup_lsn on safekeeper to reach it.
|
||||
2) checkpoint timeline on safekeeper, which should remove WAL before this LSN.
|
||||
2) checkpoint timeline on safekeeper, which should remove WAL before this LSN; optionally wait for that.
|
||||
"""
|
||||
cli = self.http_client()
|
||||
|
||||
@@ -3925,7 +3953,8 @@ class Safekeeper(LogUtils):
|
||||
# pageserver to this safekeeper
|
||||
wait_until(30, 1, are_lsns_advanced)
|
||||
cli.checkpoint(tenant_id, timeline_id)
|
||||
wait_until(30, 1, are_segments_removed)
|
||||
if wait_wal_removal:
|
||||
wait_until(30, 1, are_segments_removed)
|
||||
|
||||
def wait_until_paused(self, failpoint: str):
|
||||
msg = f"at failpoint {failpoint}"
|
||||
@@ -4447,6 +4476,7 @@ def wait_for_last_flush_lsn(
|
||||
tenant: TenantId,
|
||||
timeline: TimelineId,
|
||||
pageserver_id: Optional[int] = None,
|
||||
auth_token: Optional[str] = None,
|
||||
) -> Lsn:
|
||||
"""Wait for pageserver to catch up the latest flush LSN, returns the last observed lsn."""
|
||||
|
||||
@@ -4460,7 +4490,7 @@ def wait_for_last_flush_lsn(
|
||||
f"wait_for_last_flush_lsn: waiting for {last_flush_lsn} on shard {tenant_shard_id} on pageserver {pageserver.id})"
|
||||
)
|
||||
waited = wait_for_last_record_lsn(
|
||||
pageserver.http_client(), tenant_shard_id, timeline, last_flush_lsn
|
||||
pageserver.http_client(auth_token=auth_token), tenant_shard_id, timeline, last_flush_lsn
|
||||
)
|
||||
|
||||
assert waited >= last_flush_lsn
|
||||
@@ -4556,6 +4586,7 @@ def last_flush_lsn_upload(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
pageserver_id: Optional[int] = None,
|
||||
auth_token: Optional[str] = None,
|
||||
) -> Lsn:
|
||||
"""
|
||||
Wait for pageserver to catch to the latest flush LSN of given endpoint,
|
||||
@@ -4563,11 +4594,11 @@ def last_flush_lsn_upload(
|
||||
reaching flush LSN).
|
||||
"""
|
||||
last_flush_lsn = wait_for_last_flush_lsn(
|
||||
env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver_id
|
||||
env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver_id, auth_token=auth_token
|
||||
)
|
||||
shards = tenant_get_shards(env, tenant_id, pageserver_id)
|
||||
for tenant_shard_id, pageserver in shards:
|
||||
ps_http = pageserver.http_client()
|
||||
ps_http = pageserver.http_client(auth_token=auth_token)
|
||||
wait_for_last_record_lsn(ps_http, tenant_shard_id, timeline_id, last_flush_lsn)
|
||||
# force a checkpoint to trigger upload
|
||||
ps_http.timeline_checkpoint(tenant_shard_id, timeline_id)
|
||||
|
||||
@@ -75,9 +75,6 @@ def test_metric_collection(
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
".*metrics endpoint refused the sent metrics*",
|
||||
# we have a fast rate of calculation, these can happen at shutdown
|
||||
".*synthetic_size_worker:calculate_synthetic_size.*:gather_size_inputs.*: failed to calculate logical size at .*: cancelled.*",
|
||||
".*synthetic_size_worker: failed to calculate synthetic size for tenant .*: failed to calculate some logical_sizes",
|
||||
".*metrics_collection: failed to upload to S3: Failed to upload data of length .* to storage path.*",
|
||||
]
|
||||
)
|
||||
@@ -238,9 +235,6 @@ def test_metric_collection_cleans_up_tempfile(
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
".*metrics endpoint refused the sent metrics*",
|
||||
# we have a fast rate of calculation, these can happen at shutdown
|
||||
".*synthetic_size_worker:calculate_synthetic_size.*:gather_size_inputs.*: failed to calculate logical size at .*: cancelled.*",
|
||||
".*synthetic_size_worker: failed to calculate synthetic size for tenant .*: failed to calculate some logical_sizes",
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
@@ -317,9 +317,9 @@ def test_broker(neon_env_builder: NeonEnvBuilder):
|
||||
time.sleep(1)
|
||||
|
||||
# Ensure that safekeepers don't lose remote_consistent_lsn on restart.
|
||||
# Control file is persisted each 5s. TODO: do that on shutdown and remove sleep.
|
||||
time.sleep(6)
|
||||
for sk in env.safekeepers:
|
||||
# force persist cfile
|
||||
sk.http_client().checkpoint(tenant_id, timeline_id)
|
||||
sk.stop()
|
||||
sk.start()
|
||||
stat_after_restart = [cli.timeline_status(tenant_id, timeline_id) for cli in clients]
|
||||
@@ -374,7 +374,7 @@ def test_wal_removal(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
|
||||
http_cli_other = env.safekeepers[0].http_client(
|
||||
auth_token=env.auth_keys.generate_tenant_token(TenantId.generate())
|
||||
)
|
||||
http_cli_noauth = env.safekeepers[0].http_client()
|
||||
http_cli_noauth = env.safekeepers[0].http_client(gen_sk_wide_token=False)
|
||||
|
||||
# Pretend WAL is offloaded to s3.
|
||||
if auth_enabled:
|
||||
@@ -830,7 +830,7 @@ def test_timeline_status(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
|
||||
auth_token=env.auth_keys.generate_tenant_token(TenantId.generate())
|
||||
)
|
||||
wa_http_cli_bad.check_status()
|
||||
wa_http_cli_noauth = wa.http_client()
|
||||
wa_http_cli_noauth = wa.http_client(gen_sk_wide_token=False)
|
||||
wa_http_cli_noauth.check_status()
|
||||
|
||||
# debug endpoint requires safekeeper scope
|
||||
@@ -964,7 +964,7 @@ def test_sk_auth(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
# By default, neon_local enables auth on all services if auth is configured,
|
||||
# so http must require the token.
|
||||
sk_http_cli_noauth = sk.http_client()
|
||||
sk_http_cli_noauth = sk.http_client(gen_sk_wide_token=False)
|
||||
sk_http_cli_auth = sk.http_client(auth_token=env.auth_keys.generate_tenant_token(tenant_id))
|
||||
with pytest.raises(sk_http_cli_noauth.HTTPError, match="Forbidden|Unauthorized"):
|
||||
sk_http_cli_noauth.timeline_status(tenant_id, timeline_id)
|
||||
@@ -1640,7 +1640,7 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
|
||||
sk_http_other = sk.http_client(
|
||||
auth_token=env.auth_keys.generate_tenant_token(tenant_id_other)
|
||||
)
|
||||
sk_http_noauth = sk.http_client()
|
||||
sk_http_noauth = sk.http_client(gen_sk_wide_token=False)
|
||||
assert (sk_data_dir / str(tenant_id) / str(timeline_id_1)).is_dir()
|
||||
assert (sk_data_dir / str(tenant_id) / str(timeline_id_2)).is_dir()
|
||||
assert (sk_data_dir / str(tenant_id) / str(timeline_id_3)).is_dir()
|
||||
@@ -1723,7 +1723,13 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
|
||||
cur.execute("INSERT INTO t (key) VALUES (123)")
|
||||
|
||||
|
||||
def test_pull_timeline(neon_env_builder: NeonEnvBuilder):
|
||||
# Basic pull_timeline test.
|
||||
# When live_sk_change is False, compute is restarted to change set of
|
||||
# safekeepers; otherwise it is live reload.
|
||||
@pytest.mark.parametrize("live_sk_change", [False, True])
|
||||
def test_pull_timeline(neon_env_builder: NeonEnvBuilder, live_sk_change: bool):
|
||||
neon_env_builder.auth_enabled = True
|
||||
|
||||
def execute_payload(endpoint: Endpoint):
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
@@ -1739,7 +1745,7 @@ def test_pull_timeline(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
def show_statuses(safekeepers: List[Safekeeper], tenant_id: TenantId, timeline_id: TimelineId):
|
||||
for sk in safekeepers:
|
||||
http_cli = sk.http_client()
|
||||
http_cli = sk.http_client(auth_token=env.auth_keys.generate_tenant_token(tenant_id))
|
||||
try:
|
||||
status = http_cli.timeline_status(tenant_id, timeline_id)
|
||||
log.info(f"Safekeeper {sk.id} status: {status}")
|
||||
@@ -1749,13 +1755,12 @@ def test_pull_timeline(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_safekeepers = 4
|
||||
env = neon_env_builder.init_start()
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.neon_cli.create_branch("test_pull_timeline")
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
log.info("Use only first 3 safekeepers")
|
||||
env.safekeepers[3].stop()
|
||||
endpoint = env.endpoints.create("test_pull_timeline")
|
||||
endpoint.active_safekeepers = [1, 2, 3]
|
||||
endpoint.start()
|
||||
endpoint = env.endpoints.create("main")
|
||||
endpoint.start(safekeepers=[1, 2, 3])
|
||||
|
||||
execute_payload(endpoint)
|
||||
show_statuses(env.safekeepers, tenant_id, timeline_id)
|
||||
@@ -1767,29 +1772,22 @@ def test_pull_timeline(neon_env_builder: NeonEnvBuilder):
|
||||
log.info("Initialize new safekeeper 4, pull data from 1 & 3")
|
||||
env.safekeepers[3].start()
|
||||
|
||||
res = (
|
||||
env.safekeepers[3]
|
||||
.http_client()
|
||||
.pull_timeline(
|
||||
{
|
||||
"tenant_id": str(tenant_id),
|
||||
"timeline_id": str(timeline_id),
|
||||
"http_hosts": [
|
||||
f"http://localhost:{env.safekeepers[0].port.http}",
|
||||
f"http://localhost:{env.safekeepers[2].port.http}",
|
||||
],
|
||||
}
|
||||
)
|
||||
res = env.safekeepers[3].pull_timeline(
|
||||
[env.safekeepers[0], env.safekeepers[2]], tenant_id, timeline_id
|
||||
)
|
||||
log.info("Finished pulling timeline")
|
||||
log.info(res)
|
||||
|
||||
show_statuses(env.safekeepers, tenant_id, timeline_id)
|
||||
|
||||
log.info("Restarting compute with new config to verify that it works")
|
||||
endpoint.stop_and_destroy().create("test_pull_timeline")
|
||||
endpoint.active_safekeepers = [1, 3, 4]
|
||||
endpoint.start()
|
||||
action = "reconfiguing" if live_sk_change else "restarting"
|
||||
log.info(f"{action} compute with new config to verify that it works")
|
||||
new_sks = [1, 3, 4]
|
||||
if not live_sk_change:
|
||||
endpoint.stop_and_destroy().create("main")
|
||||
endpoint.start(safekeepers=new_sks)
|
||||
else:
|
||||
endpoint.reconfigure(safekeepers=new_sks)
|
||||
|
||||
execute_payload(endpoint)
|
||||
show_statuses(env.safekeepers, tenant_id, timeline_id)
|
||||
@@ -1816,8 +1814,8 @@ def test_pull_timeline(neon_env_builder: NeonEnvBuilder):
|
||||
# 4) Do some write, verify integrity with timeline_digest.
|
||||
# Expected to fail while holding off WAL gc plus fetching commit_lsn WAL
|
||||
# segment is not implemented.
|
||||
@pytest.mark.xfail
|
||||
def test_pull_timeline_gc(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.auth_enabled = True
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage())
|
||||
env = neon_env_builder.init_start()
|
||||
@@ -1836,27 +1834,36 @@ def test_pull_timeline_gc(neon_env_builder: NeonEnvBuilder):
|
||||
src_flush_lsn = src_sk.get_flush_lsn(tenant_id, timeline_id)
|
||||
log.info(f"flush_lsn on src before pull_timeline: {src_flush_lsn}")
|
||||
|
||||
dst_http = dst_sk.http_client()
|
||||
src_http = src_sk.http_client()
|
||||
# run pull_timeline which will halt before downloading files
|
||||
dst_http.configure_failpoints(("sk-pull-timeline-after-list-pausable", "pause"))
|
||||
src_http.configure_failpoints(("sk-snapshot-after-list-pausable", "pause"))
|
||||
pt_handle = PropagatingThread(
|
||||
target=dst_sk.pull_timeline, args=([src_sk], tenant_id, timeline_id)
|
||||
)
|
||||
pt_handle.start()
|
||||
dst_sk.wait_until_paused("sk-pull-timeline-after-list-pausable")
|
||||
src_sk.wait_until_paused("sk-snapshot-after-list-pausable")
|
||||
|
||||
# ensure segment exists
|
||||
endpoint.safe_psql("insert into t select generate_series(1, 180000), 'papaya'")
|
||||
lsn = last_flush_lsn_upload(env, endpoint, tenant_id, timeline_id)
|
||||
lsn = last_flush_lsn_upload(
|
||||
env,
|
||||
endpoint,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
auth_token=env.auth_keys.generate_tenant_token(tenant_id),
|
||||
)
|
||||
assert lsn > Lsn("0/2000000")
|
||||
# Checkpoint timeline beyond lsn.
|
||||
src_sk.checkpoint_up_to(tenant_id, timeline_id, lsn)
|
||||
src_sk.checkpoint_up_to(tenant_id, timeline_id, lsn, wait_wal_removal=False)
|
||||
first_segment_p = src_sk.timeline_dir(tenant_id, timeline_id) / "000000010000000000000001"
|
||||
log.info(f"first segment exist={os.path.exists(first_segment_p)}")
|
||||
|
||||
dst_http.configure_failpoints(("sk-pull-timeline-after-list-pausable", "off"))
|
||||
src_http.configure_failpoints(("sk-snapshot-after-list-pausable", "off"))
|
||||
pt_handle.join()
|
||||
|
||||
# after pull_timeline is finished WAL should be removed on donor
|
||||
src_sk.checkpoint_up_to(tenant_id, timeline_id, lsn, wait_wal_removal=True)
|
||||
|
||||
timeline_start_lsn = src_sk.get_timeline_start_lsn(tenant_id, timeline_id)
|
||||
dst_flush_lsn = dst_sk.get_flush_lsn(tenant_id, timeline_id)
|
||||
log.info(f"flush_lsn on dst after pull_timeline: {dst_flush_lsn}")
|
||||
@@ -1883,8 +1890,8 @@ def test_pull_timeline_gc(neon_env_builder: NeonEnvBuilder):
|
||||
# enough, so it won't be affected by term change anymore.
|
||||
#
|
||||
# Expected to fail while term check is not implemented.
|
||||
@pytest.mark.xfail
|
||||
def test_pull_timeline_term_change(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.auth_enabled = True
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage())
|
||||
env = neon_env_builder.init_start()
|
||||
@@ -1900,14 +1907,14 @@ def test_pull_timeline_term_change(neon_env_builder: NeonEnvBuilder):
|
||||
ep.safe_psql("create table t(key int, value text)")
|
||||
ep.safe_psql("insert into t select generate_series(1, 1000), 'pear'")
|
||||
|
||||
dst_http = dst_sk.http_client()
|
||||
src_http = src_sk.http_client()
|
||||
# run pull_timeline which will halt before downloading files
|
||||
dst_http.configure_failpoints(("sk-pull-timeline-after-list-pausable", "pause"))
|
||||
src_http.configure_failpoints(("sk-snapshot-after-list-pausable", "pause"))
|
||||
pt_handle = PropagatingThread(
|
||||
target=dst_sk.pull_timeline, args=([src_sk], tenant_id, timeline_id)
|
||||
)
|
||||
pt_handle.start()
|
||||
dst_sk.wait_until_paused("sk-pull-timeline-after-list-pausable")
|
||||
src_sk.wait_until_paused("sk-snapshot-after-list-pausable")
|
||||
|
||||
src_http = src_sk.http_client()
|
||||
term_before = src_http.timeline_status(tenant_id, timeline_id).term
|
||||
@@ -1922,7 +1929,7 @@ def test_pull_timeline_term_change(neon_env_builder: NeonEnvBuilder):
|
||||
term_after = src_http.timeline_status(tenant_id, timeline_id).term
|
||||
assert term_after > term_before, f"term_after={term_after}, term_before={term_before}"
|
||||
|
||||
dst_http.configure_failpoints(("sk-pull-timeline-after-list-pausable", "off"))
|
||||
src_http.configure_failpoints(("sk-snapshot-after-list-pausable", "off"))
|
||||
with pytest.raises(requests.exceptions.HTTPError):
|
||||
pt_handle.join()
|
||||
|
||||
|
||||
@@ -324,14 +324,15 @@ files:
|
||||
help: 'Whether or not the replication slot wal_status is lost'
|
||||
key_labels:
|
||||
- slot_name
|
||||
values: [wal_status_is_lost]
|
||||
values: [wal_is_lost]
|
||||
query: |
|
||||
SELECT slot_name,
|
||||
CASE
|
||||
WHEN wal_status = 'lost' THEN 1
|
||||
ELSE 0
|
||||
END AS wal_status_is_lost
|
||||
END AS wal_is_lost
|
||||
FROM pg_replication_slots;
|
||||
|
||||
- filename: neon_collector_autoscaling.yml
|
||||
content: |
|
||||
collector_name: neon_collector_autoscaling
|
||||
|
||||
Reference in New Issue
Block a user