mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-17 21:20:37 +00:00
Compare commits
19 Commits
release-84
...
tristan957
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0df4286056 | ||
|
|
800f90f6a6 | ||
|
|
8e09ecf2ab | ||
|
|
c3534cea39 | ||
|
|
21d3d60cef | ||
|
|
b00db536bb | ||
|
|
149cbd1e0a | ||
|
|
7b949daf13 | ||
|
|
132b6154bb | ||
|
|
ad3519ebcb | ||
|
|
6173c0f44c | ||
|
|
fd916abf25 | ||
|
|
cd2e1fbc7c | ||
|
|
5df4a747e6 | ||
|
|
cbf442292b | ||
|
|
4d0c1e8b78 | ||
|
|
3158442a59 | ||
|
|
f006879fb7 | ||
|
|
a0d844dfed |
@@ -19,7 +19,7 @@
|
||||
!pageserver/
|
||||
!pgxn/
|
||||
!proxy/
|
||||
!object_storage/
|
||||
!endpoint_storage/
|
||||
!storage_scrubber/
|
||||
!safekeeper/
|
||||
!storage_broker/
|
||||
|
||||
12
.github/actions/run-python-test-set/action.yml
vendored
12
.github/actions/run-python-test-set/action.yml
vendored
@@ -133,6 +133,7 @@ runs:
|
||||
fi
|
||||
|
||||
PERF_REPORT_DIR="$(realpath test_runner/perf-report-local)"
|
||||
echo "PERF_REPORT_DIR=${PERF_REPORT_DIR}" >> ${GITHUB_ENV}
|
||||
rm -rf $PERF_REPORT_DIR
|
||||
|
||||
TEST_SELECTION="test_runner/${{ inputs.test_selection }}"
|
||||
@@ -209,11 +210,12 @@ runs:
|
||||
--verbose \
|
||||
-rA $TEST_SELECTION $EXTRA_PARAMS
|
||||
|
||||
if [[ "${{ inputs.save_perf_report }}" == "true" ]]; then
|
||||
export REPORT_FROM="$PERF_REPORT_DIR"
|
||||
export REPORT_TO="$PLATFORM"
|
||||
scripts/generate_and_push_perf_report.sh
|
||||
fi
|
||||
- name: Upload performance report
|
||||
if: ${{ !cancelled() && inputs.save_perf_report == 'true' }}
|
||||
shell: bash -euxo pipefail {0}
|
||||
run: |
|
||||
export REPORT_FROM="${PERF_REPORT_DIR}"
|
||||
scripts/generate_and_push_perf_report.sh
|
||||
|
||||
- name: Upload compatibility snapshot
|
||||
# Note, that we use `github.base_ref` which is a target branch for a PR
|
||||
|
||||
2
.github/workflows/_meta.yml
vendored
2
.github/workflows/_meta.yml
vendored
@@ -165,5 +165,5 @@ jobs:
|
||||
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
CURRENT_SHA: ${{ github.sha }}
|
||||
run: |
|
||||
RELEASE_PR_RUN_ID=$(gh api "/repos/${GITHUB_REPOSITORY}/actions/runs?head_sha=$CURRENT_SHA" | jq '[.workflow_runs[] | select(.name == "Build and Test") | select(.head_branch | test("^rc/release(-(proxy|compute))?/[0-9]{4}-[0-9]{2}-[0-9]{2}$"; "s"))] | first | .id // ("Failed to find Build and Test run from RC PR!" | halt_error(1))')
|
||||
RELEASE_PR_RUN_ID=$(gh api "/repos/${GITHUB_REPOSITORY}/actions/runs?head_sha=$CURRENT_SHA" | jq '[.workflow_runs[] | select(.name == "Build and Test") | select(.head_branch | test("^rc/release.*$"; "s"))] | first | .id // ("Failed to find Build and Test run from RC PR!" | halt_error(1))')
|
||||
echo "release-pr-run-id=$RELEASE_PR_RUN_ID" | tee -a $GITHUB_OUTPUT
|
||||
|
||||
8
.github/workflows/fast-forward.yml
vendored
8
.github/workflows/fast-forward.yml
vendored
@@ -27,15 +27,17 @@ jobs:
|
||||
- name: Fast forwarding
|
||||
uses: sequoia-pgp/fast-forward@ea7628bedcb0b0b96e94383ada458d812fca4979
|
||||
# See https://docs.github.com/en/graphql/reference/enums#mergestatestatus
|
||||
if: ${{ github.event.pull_request.mergeable_state == 'clean' }}
|
||||
if: ${{ contains(fromJSON('["clean", "unstable"]'), github.event.pull_request.mergeable_state) }}
|
||||
with:
|
||||
merge: true
|
||||
comment: on-error
|
||||
github_token: ${{ secrets.CI_ACCESS_TOKEN }}
|
||||
|
||||
- name: Comment if mergeable_state is not clean
|
||||
if: ${{ github.event.pull_request.mergeable_state != 'clean' }}
|
||||
if: ${{ !contains(fromJSON('["clean", "unstable"]'), github.event.pull_request.mergeable_state) }}
|
||||
env:
|
||||
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
|
||||
run: |
|
||||
gh pr comment ${{ github.event.pull_request.number }} \
|
||||
--repo "${GITHUB_REPOSITORY}" \
|
||||
--body "Not trying to forward pull-request, because \`mergeable_state\` is \`${{ github.event.pull_request.mergeable_state }}\`, not \`clean\`."
|
||||
--body "Not trying to forward pull-request, because \`mergeable_state\` is \`${{ github.event.pull_request.mergeable_state }}\`, not \`clean\` or \`unstable\`."
|
||||
|
||||
56
Cargo.lock
generated
56
Cargo.lock
generated
@@ -2037,6 +2037,33 @@ dependencies = [
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "endpoint_storage"
|
||||
version = "0.0.1"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"axum",
|
||||
"axum-extra",
|
||||
"camino",
|
||||
"camino-tempfile",
|
||||
"futures",
|
||||
"http-body-util",
|
||||
"itertools 0.10.5",
|
||||
"jsonwebtoken",
|
||||
"prometheus",
|
||||
"rand 0.8.5",
|
||||
"remote_storage",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"test-log",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tower 0.5.2",
|
||||
"tracing",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "enum-map"
|
||||
version = "2.5.0"
|
||||
@@ -3998,33 +4025,6 @@ dependencies = [
|
||||
"memchr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "object_storage"
|
||||
version = "0.0.1"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"axum",
|
||||
"axum-extra",
|
||||
"camino",
|
||||
"camino-tempfile",
|
||||
"futures",
|
||||
"http-body-util",
|
||||
"itertools 0.10.5",
|
||||
"jsonwebtoken",
|
||||
"prometheus",
|
||||
"rand 0.8.5",
|
||||
"remote_storage",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"test-log",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tower 0.5.2",
|
||||
"tracing",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "once_cell"
|
||||
version = "1.20.2"
|
||||
@@ -4285,6 +4285,7 @@ dependencies = [
|
||||
"pageserver_api",
|
||||
"pageserver_client",
|
||||
"pageserver_compaction",
|
||||
"pem",
|
||||
"pin-project-lite",
|
||||
"postgres-protocol",
|
||||
"postgres-types",
|
||||
@@ -6001,6 +6002,7 @@ dependencies = [
|
||||
"once_cell",
|
||||
"pageserver_api",
|
||||
"parking_lot 0.12.1",
|
||||
"pem",
|
||||
"postgres-protocol",
|
||||
"postgres_backend",
|
||||
"postgres_ffi",
|
||||
|
||||
@@ -40,7 +40,7 @@ members = [
|
||||
"libs/proxy/postgres-protocol2",
|
||||
"libs/proxy/postgres-types2",
|
||||
"libs/proxy/tokio-postgres2",
|
||||
"object_storage",
|
||||
"endpoint_storage",
|
||||
]
|
||||
|
||||
[workspace.package]
|
||||
|
||||
@@ -89,7 +89,7 @@ RUN set -e \
|
||||
--bin storage_broker \
|
||||
--bin storage_controller \
|
||||
--bin proxy \
|
||||
--bin object_storage \
|
||||
--bin endpoint_storage \
|
||||
--bin neon_local \
|
||||
--bin storage_scrubber \
|
||||
--locked --release
|
||||
@@ -122,7 +122,7 @@ COPY --from=build --chown=neon:neon /home/nonroot/target/release/safekeeper
|
||||
COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_broker /usr/local/bin
|
||||
COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_controller /usr/local/bin
|
||||
COPY --from=build --chown=neon:neon /home/nonroot/target/release/proxy /usr/local/bin
|
||||
COPY --from=build --chown=neon:neon /home/nonroot/target/release/object_storage /usr/local/bin
|
||||
COPY --from=build --chown=neon:neon /home/nonroot/target/release/endpoint_storage /usr/local/bin
|
||||
COPY --from=build --chown=neon:neon /home/nonroot/target/release/neon_local /usr/local/bin
|
||||
COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_scrubber /usr/local/bin
|
||||
|
||||
|
||||
@@ -1677,7 +1677,7 @@ RUN set -e \
|
||||
&& apt clean && rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# Use `dist_man_MANS=` to skip manpage generation (which requires python3/pandoc)
|
||||
ENV PGBOUNCER_TAG=pgbouncer_1_22_1
|
||||
ENV PGBOUNCER_TAG=pgbouncer_1_24_1
|
||||
RUN set -e \
|
||||
&& git clone --recurse-submodules --depth 1 --branch ${PGBOUNCER_TAG} https://github.com/pgbouncer/pgbouncer.git pgbouncer \
|
||||
&& cd pgbouncer \
|
||||
|
||||
@@ -11,6 +11,14 @@ index bf6edcb..89b4c7f 100644
|
||||
|
||||
USE_PGXS = 1 # use pgxs if not in contrib directory
|
||||
PGXS := $(shell $(PG_CONFIG) --pgxs)
|
||||
diff --git a/regress/expected/init-extension.out b/regress/expected/init-extension.out
|
||||
index 9f2e171..f6e4f8d 100644
|
||||
--- a/regress/expected/init-extension.out
|
||||
+++ b/regress/expected/init-extension.out
|
||||
@@ -1,3 +1,2 @@
|
||||
SET client_min_messages = warning;
|
||||
CREATE EXTENSION pg_repack;
|
||||
-RESET client_min_messages;
|
||||
diff --git a/regress/expected/nosuper.out b/regress/expected/nosuper.out
|
||||
index 8d0a94e..63b68bf 100644
|
||||
--- a/regress/expected/nosuper.out
|
||||
@@ -42,6 +50,14 @@ index 8d0a94e..63b68bf 100644
|
||||
INFO: repacking table "public.tbl_cluster"
|
||||
ERROR: query failed: ERROR: current transaction is aborted, commands ignored until end of transaction block
|
||||
DETAIL: query was: RESET lock_timeout
|
||||
diff --git a/regress/sql/init-extension.sql b/regress/sql/init-extension.sql
|
||||
index 9f2e171..f6e4f8d 100644
|
||||
--- a/regress/sql/init-extension.sql
|
||||
+++ b/regress/sql/init-extension.sql
|
||||
@@ -1,3 +1,2 @@
|
||||
SET client_min_messages = warning;
|
||||
CREATE EXTENSION pg_repack;
|
||||
-RESET client_min_messages;
|
||||
diff --git a/regress/sql/nosuper.sql b/regress/sql/nosuper.sql
|
||||
index 072f0fa..dbe60f8 100644
|
||||
--- a/regress/sql/nosuper.sql
|
||||
|
||||
@@ -18,12 +18,11 @@ use anyhow::{Context, Result, anyhow, bail};
|
||||
use clap::Parser;
|
||||
use compute_api::spec::ComputeMode;
|
||||
use control_plane::endpoint::ComputeControlPlane;
|
||||
use control_plane::endpoint_storage::{ENDPOINT_STORAGE_DEFAULT_PORT, EndpointStorage};
|
||||
use control_plane::local_env::{
|
||||
InitForceMode, LocalEnv, NeonBroker, NeonLocalInitConf, NeonLocalInitPageserverConf,
|
||||
ObjectStorageConf, SafekeeperConf,
|
||||
EndpointStorageConf, InitForceMode, LocalEnv, NeonBroker, NeonLocalInitConf,
|
||||
NeonLocalInitPageserverConf, SafekeeperConf,
|
||||
};
|
||||
use control_plane::object_storage::OBJECT_STORAGE_DEFAULT_PORT;
|
||||
use control_plane::object_storage::ObjectStorage;
|
||||
use control_plane::pageserver::PageServerNode;
|
||||
use control_plane::safekeeper::SafekeeperNode;
|
||||
use control_plane::storage_controller::{
|
||||
@@ -93,7 +92,7 @@ enum NeonLocalCmd {
|
||||
#[command(subcommand)]
|
||||
Safekeeper(SafekeeperCmd),
|
||||
#[command(subcommand)]
|
||||
ObjectStorage(ObjectStorageCmd),
|
||||
EndpointStorage(EndpointStorageCmd),
|
||||
#[command(subcommand)]
|
||||
Endpoint(EndpointCmd),
|
||||
#[command(subcommand)]
|
||||
@@ -460,14 +459,14 @@ enum SafekeeperCmd {
|
||||
|
||||
#[derive(clap::Subcommand)]
|
||||
#[clap(about = "Manage object storage")]
|
||||
enum ObjectStorageCmd {
|
||||
Start(ObjectStorageStartCmd),
|
||||
Stop(ObjectStorageStopCmd),
|
||||
enum EndpointStorageCmd {
|
||||
Start(EndpointStorageStartCmd),
|
||||
Stop(EndpointStorageStopCmd),
|
||||
}
|
||||
|
||||
#[derive(clap::Args)]
|
||||
#[clap(about = "Start object storage")]
|
||||
struct ObjectStorageStartCmd {
|
||||
struct EndpointStorageStartCmd {
|
||||
#[clap(short = 't', long, help = "timeout until we fail the command")]
|
||||
#[arg(default_value = "10s")]
|
||||
start_timeout: humantime::Duration,
|
||||
@@ -475,7 +474,7 @@ struct ObjectStorageStartCmd {
|
||||
|
||||
#[derive(clap::Args)]
|
||||
#[clap(about = "Stop object storage")]
|
||||
struct ObjectStorageStopCmd {
|
||||
struct EndpointStorageStopCmd {
|
||||
#[arg(value_enum, default_value = "fast")]
|
||||
#[clap(
|
||||
short = 'm',
|
||||
@@ -797,7 +796,9 @@ fn main() -> Result<()> {
|
||||
}
|
||||
NeonLocalCmd::StorageBroker(subcmd) => rt.block_on(handle_storage_broker(&subcmd, env)),
|
||||
NeonLocalCmd::Safekeeper(subcmd) => rt.block_on(handle_safekeeper(&subcmd, env)),
|
||||
NeonLocalCmd::ObjectStorage(subcmd) => rt.block_on(handle_object_storage(&subcmd, env)),
|
||||
NeonLocalCmd::EndpointStorage(subcmd) => {
|
||||
rt.block_on(handle_endpoint_storage(&subcmd, env))
|
||||
}
|
||||
NeonLocalCmd::Endpoint(subcmd) => rt.block_on(handle_endpoint(&subcmd, env)),
|
||||
NeonLocalCmd::Mappings(subcmd) => handle_mappings(&subcmd, env),
|
||||
};
|
||||
@@ -1014,8 +1015,8 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result<LocalEnv> {
|
||||
}
|
||||
})
|
||||
.collect(),
|
||||
object_storage: ObjectStorageConf {
|
||||
port: OBJECT_STORAGE_DEFAULT_PORT,
|
||||
endpoint_storage: EndpointStorageConf {
|
||||
port: ENDPOINT_STORAGE_DEFAULT_PORT,
|
||||
},
|
||||
pg_distrib_dir: None,
|
||||
neon_distrib_dir: None,
|
||||
@@ -1735,12 +1736,15 @@ async fn handle_safekeeper(subcmd: &SafekeeperCmd, env: &local_env::LocalEnv) ->
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_object_storage(subcmd: &ObjectStorageCmd, env: &local_env::LocalEnv) -> Result<()> {
|
||||
use ObjectStorageCmd::*;
|
||||
let storage = ObjectStorage::from_env(env);
|
||||
async fn handle_endpoint_storage(
|
||||
subcmd: &EndpointStorageCmd,
|
||||
env: &local_env::LocalEnv,
|
||||
) -> Result<()> {
|
||||
use EndpointStorageCmd::*;
|
||||
let storage = EndpointStorage::from_env(env);
|
||||
|
||||
// In tests like test_forward_compatibility or test_graceful_cluster_restart
|
||||
// old neon binaries (without object_storage) are present
|
||||
// old neon binaries (without endpoint_storage) are present
|
||||
if !storage.bin.exists() {
|
||||
eprintln!(
|
||||
"{} binary not found. Ignore if this is a compatibility test",
|
||||
@@ -1750,13 +1754,13 @@ async fn handle_object_storage(subcmd: &ObjectStorageCmd, env: &local_env::Local
|
||||
}
|
||||
|
||||
match subcmd {
|
||||
Start(ObjectStorageStartCmd { start_timeout }) => {
|
||||
Start(EndpointStorageStartCmd { start_timeout }) => {
|
||||
if let Err(e) = storage.start(start_timeout).await {
|
||||
eprintln!("object_storage start failed: {e}");
|
||||
eprintln!("endpoint_storage start failed: {e}");
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
Stop(ObjectStorageStopCmd { stop_mode }) => {
|
||||
Stop(EndpointStorageStopCmd { stop_mode }) => {
|
||||
let immediate = match stop_mode {
|
||||
StopMode::Fast => false,
|
||||
StopMode::Immediate => true,
|
||||
@@ -1866,10 +1870,10 @@ async fn handle_start_all_impl(
|
||||
}
|
||||
|
||||
js.spawn(async move {
|
||||
ObjectStorage::from_env(env)
|
||||
EndpointStorage::from_env(env)
|
||||
.start(&retry_timeout)
|
||||
.await
|
||||
.map_err(|e| e.context("start object_storage"))
|
||||
.map_err(|e| e.context("start endpoint_storage"))
|
||||
});
|
||||
})();
|
||||
|
||||
@@ -1968,9 +1972,9 @@ async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
|
||||
}
|
||||
}
|
||||
|
||||
let storage = ObjectStorage::from_env(env);
|
||||
let storage = EndpointStorage::from_env(env);
|
||||
if let Err(e) = storage.stop(immediate) {
|
||||
eprintln!("object_storage stop failed: {:#}", e);
|
||||
eprintln!("endpoint_storage stop failed: {:#}", e);
|
||||
}
|
||||
|
||||
for ps_conf in &env.pageservers {
|
||||
|
||||
@@ -1,34 +1,33 @@
|
||||
use crate::background_process::{self, start_process, stop_process};
|
||||
use crate::local_env::LocalEnv;
|
||||
use anyhow::anyhow;
|
||||
use anyhow::{Context, Result};
|
||||
use camino::Utf8PathBuf;
|
||||
use std::io::Write;
|
||||
use std::time::Duration;
|
||||
|
||||
/// Directory within .neon which will be used by default for LocalFs remote storage.
|
||||
pub const OBJECT_STORAGE_REMOTE_STORAGE_DIR: &str = "local_fs_remote_storage/object_storage";
|
||||
pub const OBJECT_STORAGE_DEFAULT_PORT: u16 = 9993;
|
||||
pub const ENDPOINT_STORAGE_REMOTE_STORAGE_DIR: &str = "local_fs_remote_storage/endpoint_storage";
|
||||
pub const ENDPOINT_STORAGE_DEFAULT_PORT: u16 = 9993;
|
||||
|
||||
pub struct ObjectStorage {
|
||||
pub struct EndpointStorage {
|
||||
pub bin: Utf8PathBuf,
|
||||
pub data_dir: Utf8PathBuf,
|
||||
pub pemfile: Utf8PathBuf,
|
||||
pub port: u16,
|
||||
}
|
||||
|
||||
impl ObjectStorage {
|
||||
pub fn from_env(env: &LocalEnv) -> ObjectStorage {
|
||||
ObjectStorage {
|
||||
bin: Utf8PathBuf::from_path_buf(env.object_storage_bin()).unwrap(),
|
||||
data_dir: Utf8PathBuf::from_path_buf(env.object_storage_data_dir()).unwrap(),
|
||||
impl EndpointStorage {
|
||||
pub fn from_env(env: &LocalEnv) -> EndpointStorage {
|
||||
EndpointStorage {
|
||||
bin: Utf8PathBuf::from_path_buf(env.endpoint_storage_bin()).unwrap(),
|
||||
data_dir: Utf8PathBuf::from_path_buf(env.endpoint_storage_data_dir()).unwrap(),
|
||||
pemfile: Utf8PathBuf::from_path_buf(env.public_key_path.clone()).unwrap(),
|
||||
port: env.object_storage.port,
|
||||
port: env.endpoint_storage.port,
|
||||
}
|
||||
}
|
||||
|
||||
fn config_path(&self) -> Utf8PathBuf {
|
||||
self.data_dir.join("object_storage.json")
|
||||
self.data_dir.join("endpoint_storage.json")
|
||||
}
|
||||
|
||||
fn listen_addr(&self) -> Utf8PathBuf {
|
||||
@@ -49,7 +48,7 @@ impl ObjectStorage {
|
||||
let cfg = Cfg {
|
||||
listen: self.listen_addr(),
|
||||
pemfile: parent.join(self.pemfile.clone()),
|
||||
local_path: parent.join(OBJECT_STORAGE_REMOTE_STORAGE_DIR),
|
||||
local_path: parent.join(ENDPOINT_STORAGE_REMOTE_STORAGE_DIR),
|
||||
r#type: "LocalFs".to_string(),
|
||||
};
|
||||
std::fs::create_dir_all(self.config_path().parent().unwrap())?;
|
||||
@@ -59,24 +58,19 @@ impl ObjectStorage {
|
||||
}
|
||||
|
||||
pub async fn start(&self, retry_timeout: &Duration) -> Result<()> {
|
||||
println!("Starting s3 proxy at {}", self.listen_addr());
|
||||
println!("Starting endpoint_storage at {}", self.listen_addr());
|
||||
std::io::stdout().flush().context("flush stdout")?;
|
||||
|
||||
let process_status_check = || async {
|
||||
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||
let res = reqwest::Client::new()
|
||||
.get(format!("http://{}/metrics", self.listen_addr()))
|
||||
.send()
|
||||
.await;
|
||||
match res {
|
||||
Ok(response) if response.status().is_success() => Ok(true),
|
||||
Ok(_) => Err(anyhow!("Failed to query /metrics")),
|
||||
Err(e) => Err(anyhow!("Failed to check node status: {e}")),
|
||||
let res = reqwest::Client::new().get(format!("http://{}/metrics", self.listen_addr()));
|
||||
match res.send().await {
|
||||
Ok(res) => Ok(res.status().is_success()),
|
||||
Err(_) => Ok(false),
|
||||
}
|
||||
};
|
||||
|
||||
let res = start_process(
|
||||
"object_storage",
|
||||
"endpoint_storage",
|
||||
&self.data_dir.clone().into_std_path_buf(),
|
||||
&self.bin.clone().into_std_path_buf(),
|
||||
vec![self.config_path().to_string()],
|
||||
@@ -94,14 +88,14 @@ impl ObjectStorage {
|
||||
}
|
||||
|
||||
pub fn stop(&self, immediate: bool) -> anyhow::Result<()> {
|
||||
stop_process(immediate, "object_storage", &self.pid_file())
|
||||
stop_process(immediate, "endpoint_storage", &self.pid_file())
|
||||
}
|
||||
|
||||
fn log_file(&self) -> Utf8PathBuf {
|
||||
self.data_dir.join("object_storage.log")
|
||||
self.data_dir.join("endpoint_storage.log")
|
||||
}
|
||||
|
||||
fn pid_file(&self) -> Utf8PathBuf {
|
||||
self.data_dir.join("object_storage.pid")
|
||||
self.data_dir.join("endpoint_storage.pid")
|
||||
}
|
||||
}
|
||||
@@ -9,8 +9,8 @@
|
||||
mod background_process;
|
||||
pub mod broker;
|
||||
pub mod endpoint;
|
||||
pub mod endpoint_storage;
|
||||
pub mod local_env;
|
||||
pub mod object_storage;
|
||||
pub mod pageserver;
|
||||
pub mod postgresql_conf;
|
||||
pub mod safekeeper;
|
||||
|
||||
@@ -19,7 +19,7 @@ use serde::{Deserialize, Serialize};
|
||||
use utils::auth::encode_from_key_file;
|
||||
use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
|
||||
|
||||
use crate::object_storage::{OBJECT_STORAGE_REMOTE_STORAGE_DIR, ObjectStorage};
|
||||
use crate::endpoint_storage::{ENDPOINT_STORAGE_REMOTE_STORAGE_DIR, EndpointStorage};
|
||||
use crate::pageserver::{PAGESERVER_REMOTE_STORAGE_DIR, PageServerNode};
|
||||
use crate::safekeeper::SafekeeperNode;
|
||||
|
||||
@@ -72,7 +72,7 @@ pub struct LocalEnv {
|
||||
|
||||
pub safekeepers: Vec<SafekeeperConf>,
|
||||
|
||||
pub object_storage: ObjectStorageConf,
|
||||
pub endpoint_storage: EndpointStorageConf,
|
||||
|
||||
// Control plane upcall API for pageserver: if None, we will not run storage_controller If set, this will
|
||||
// be propagated into each pageserver's configuration.
|
||||
@@ -110,7 +110,7 @@ pub struct OnDiskConfig {
|
||||
)]
|
||||
pub pageservers: Vec<PageServerConf>,
|
||||
pub safekeepers: Vec<SafekeeperConf>,
|
||||
pub object_storage: ObjectStorageConf,
|
||||
pub endpoint_storage: EndpointStorageConf,
|
||||
pub control_plane_api: Option<Url>,
|
||||
pub control_plane_hooks_api: Option<Url>,
|
||||
pub control_plane_compute_hook_api: Option<Url>,
|
||||
@@ -144,7 +144,7 @@ pub struct NeonLocalInitConf {
|
||||
pub storage_controller: Option<NeonStorageControllerConf>,
|
||||
pub pageservers: Vec<NeonLocalInitPageserverConf>,
|
||||
pub safekeepers: Vec<SafekeeperConf>,
|
||||
pub object_storage: ObjectStorageConf,
|
||||
pub endpoint_storage: EndpointStorageConf,
|
||||
pub control_plane_api: Option<Url>,
|
||||
pub control_plane_hooks_api: Option<Url>,
|
||||
pub generate_local_ssl_certs: bool,
|
||||
@@ -152,7 +152,7 @@ pub struct NeonLocalInitConf {
|
||||
|
||||
#[derive(Serialize, Default, Deserialize, PartialEq, Eq, Clone, Debug)]
|
||||
#[serde(default)]
|
||||
pub struct ObjectStorageConf {
|
||||
pub struct EndpointStorageConf {
|
||||
pub port: u16,
|
||||
}
|
||||
|
||||
@@ -413,8 +413,8 @@ impl LocalEnv {
|
||||
self.pg_dir(pg_version, "lib")
|
||||
}
|
||||
|
||||
pub fn object_storage_bin(&self) -> PathBuf {
|
||||
self.neon_distrib_dir.join("object_storage")
|
||||
pub fn endpoint_storage_bin(&self) -> PathBuf {
|
||||
self.neon_distrib_dir.join("endpoint_storage")
|
||||
}
|
||||
|
||||
pub fn pageserver_bin(&self) -> PathBuf {
|
||||
@@ -450,8 +450,8 @@ impl LocalEnv {
|
||||
self.base_data_dir.join("safekeepers").join(data_dir_name)
|
||||
}
|
||||
|
||||
pub fn object_storage_data_dir(&self) -> PathBuf {
|
||||
self.base_data_dir.join("object_storage")
|
||||
pub fn endpoint_storage_data_dir(&self) -> PathBuf {
|
||||
self.base_data_dir.join("endpoint_storage")
|
||||
}
|
||||
|
||||
pub fn get_pageserver_conf(&self, id: NodeId) -> anyhow::Result<&PageServerConf> {
|
||||
@@ -615,7 +615,7 @@ impl LocalEnv {
|
||||
control_plane_compute_hook_api: _,
|
||||
branch_name_mappings,
|
||||
generate_local_ssl_certs,
|
||||
object_storage,
|
||||
endpoint_storage,
|
||||
} = on_disk_config;
|
||||
LocalEnv {
|
||||
base_data_dir: repopath.to_owned(),
|
||||
@@ -632,7 +632,7 @@ impl LocalEnv {
|
||||
control_plane_hooks_api,
|
||||
branch_name_mappings,
|
||||
generate_local_ssl_certs,
|
||||
object_storage,
|
||||
endpoint_storage,
|
||||
}
|
||||
};
|
||||
|
||||
@@ -742,7 +742,7 @@ impl LocalEnv {
|
||||
control_plane_compute_hook_api: None,
|
||||
branch_name_mappings: self.branch_name_mappings.clone(),
|
||||
generate_local_ssl_certs: self.generate_local_ssl_certs,
|
||||
object_storage: self.object_storage.clone(),
|
||||
endpoint_storage: self.endpoint_storage.clone(),
|
||||
},
|
||||
)
|
||||
}
|
||||
@@ -849,7 +849,7 @@ impl LocalEnv {
|
||||
control_plane_api,
|
||||
generate_local_ssl_certs,
|
||||
control_plane_hooks_api,
|
||||
object_storage,
|
||||
endpoint_storage,
|
||||
} = conf;
|
||||
|
||||
// Find postgres binaries.
|
||||
@@ -901,7 +901,7 @@ impl LocalEnv {
|
||||
control_plane_hooks_api,
|
||||
branch_name_mappings: Default::default(),
|
||||
generate_local_ssl_certs,
|
||||
object_storage,
|
||||
endpoint_storage,
|
||||
};
|
||||
|
||||
if generate_local_ssl_certs {
|
||||
@@ -929,13 +929,13 @@ impl LocalEnv {
|
||||
.context("pageserver init failed")?;
|
||||
}
|
||||
|
||||
ObjectStorage::from_env(&env)
|
||||
EndpointStorage::from_env(&env)
|
||||
.init()
|
||||
.context("object storage init failed")?;
|
||||
|
||||
// setup remote remote location for default LocalFs remote storage
|
||||
std::fs::create_dir_all(env.base_data_dir.join(PAGESERVER_REMOTE_STORAGE_DIR))?;
|
||||
std::fs::create_dir_all(env.base_data_dir.join(OBJECT_STORAGE_REMOTE_STORAGE_DIR))?;
|
||||
std::fs::create_dir_all(env.base_data_dir.join(ENDPOINT_STORAGE_REMOTE_STORAGE_DIR))?;
|
||||
|
||||
env.persist_config()
|
||||
}
|
||||
|
||||
53
docs/rfcs/2025-04-23-architecture-naming-conventions.md
Normal file
53
docs/rfcs/2025-04-23-architecture-naming-conventions.md
Normal file
@@ -0,0 +1,53 @@
|
||||
# Architecture Naming Scheme
|
||||
|
||||
## Summary
|
||||
|
||||
Neon computes are going to support multiple CPU architectures, including 64-bit
|
||||
x86 and ARM. Architecture naming schemes are fairly inconsistent, at least when
|
||||
it comes to these two architectures in particular. Sometimes 64-bit x86 is known
|
||||
as `amd64` and at other times `x86_64`. For 64-bit ARM, it's a similar story
|
||||
with `arm64` and `aarch64`.
|
||||
|
||||
## Motivation
|
||||
|
||||
Consistency when referring to these architectures across the platform in
|
||||
important not only so that everyone is on the same page, but also because we
|
||||
have architecture-dependent portions of the platform. One notable example is
|
||||
remote extensions. Remote extensions need to be compiled and packaged into
|
||||
compressed tarballs that are then downloaded by computes on demand. Downloading
|
||||
a tarball for the wrong architecture will cause `dlopen(3)` to fail when
|
||||
Postgres attempts to load the library. The compressed tarballs are located in an
|
||||
S3 bucket with object keys of the form
|
||||
`$BUILD_TAG/$ARCH/$PG_VERSION_NUM/${EXTENSION_NAME}.tar.zst` to mitigate the
|
||||
potential for failure. The build and deployment pipeline for remote extensions
|
||||
needs to be in lock step with the code in the compute that actually fetches
|
||||
remote extensions.
|
||||
|
||||
## Impacted Components
|
||||
|
||||
- Control Plane when persisting the target architecture in compute flags if
|
||||
specified.
|
||||
- Austocaling when scheduling compute pods.
|
||||
- `compute_ctl` when downloading remote extensions.
|
||||
- Build and deployment pipeline for remote extensions.
|
||||
|
||||
## Prior Art for CPU Architecture Names
|
||||
|
||||
- [Rust](https://doc.rust-lang.org/std/env/consts/constant.ARCH.html)
|
||||
- `x86_64`
|
||||
- `aarch64`
|
||||
- [Go](https://pkg.go.dev/internal/goarch#pkg-constants)
|
||||
- `amd64`
|
||||
- `arm64`
|
||||
- Kubernetes
|
||||
- Because Kubernetes is written in Go, it inherits the naming scheme.
|
||||
|
||||
Going all in on either the Rust naming scheme or the Go naming scheme can save
|
||||
us a branch when getting the architecture, so we should pick one and force the
|
||||
other side to conform.
|
||||
|
||||
## Decision
|
||||
|
||||
The heart of our platform is Kubernetes. We will inherit the Go naming
|
||||
scheme just like Kubernetes. This makes things easy for the autoscaling team
|
||||
when they do pod scheduling on nodes.
|
||||
@@ -1,5 +1,5 @@
|
||||
[package]
|
||||
name = "object_storage"
|
||||
name = "endpoint_storage"
|
||||
version = "0.0.1"
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
@@ -2,7 +2,7 @@ use anyhow::anyhow;
|
||||
use axum::body::{Body, Bytes};
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use axum::{Router, http::StatusCode};
|
||||
use object_storage::{PrefixS3Path, S3Path, Storage, bad_request, internal_error, not_found, ok};
|
||||
use endpoint_storage::{PrefixS3Path, S3Path, Storage, bad_request, internal_error, not_found, ok};
|
||||
use remote_storage::TimeoutOrCancel;
|
||||
use remote_storage::{DownloadError, DownloadOpts, GenericRemoteStorage, RemotePath};
|
||||
use std::{sync::Arc, time::SystemTime, time::UNIX_EPOCH};
|
||||
@@ -46,12 +46,12 @@ async fn metrics() -> Result {
|
||||
|
||||
async fn get(S3Path { path }: S3Path, state: State) -> Result {
|
||||
info!(%path, "downloading");
|
||||
let download_err = |e| {
|
||||
if let DownloadError::NotFound = e {
|
||||
info!(%path, %e, "downloading"); // 404 is not an issue of _this_ service
|
||||
let download_err = |err| {
|
||||
if let DownloadError::NotFound = err {
|
||||
info!(%path, %err, "downloading"); // 404 is not an issue of _this_ service
|
||||
return not_found(&path);
|
||||
}
|
||||
internal_error(e, &path, "downloading")
|
||||
internal_error(err, &path, "downloading")
|
||||
};
|
||||
let cancel = state.cancel.clone();
|
||||
let opts = &DownloadOpts::default();
|
||||
@@ -249,7 +249,7 @@ mod tests {
|
||||
};
|
||||
|
||||
let proxy = Storage {
|
||||
auth: object_storage::JwtAuth::new(TEST_PUB_KEY_ED25519).unwrap(),
|
||||
auth: endpoint_storage::JwtAuth::new(TEST_PUB_KEY_ED25519).unwrap(),
|
||||
storage,
|
||||
cancel: cancel.clone(),
|
||||
max_upload_file_limit: usize::MAX,
|
||||
@@ -343,14 +343,14 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
|
||||
TimelineId::from_array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 7]);
|
||||
const ENDPOINT_ID: &str = "ep-winter-frost-a662z3vg";
|
||||
fn token() -> String {
|
||||
let claims = object_storage::Claims {
|
||||
let claims = endpoint_storage::Claims {
|
||||
tenant_id: TENANT_ID,
|
||||
timeline_id: TIMELINE_ID,
|
||||
endpoint_id: ENDPOINT_ID.into(),
|
||||
exp: u64::MAX,
|
||||
};
|
||||
let key = jsonwebtoken::EncodingKey::from_ed_pem(TEST_PRIV_KEY_ED25519).unwrap();
|
||||
let header = jsonwebtoken::Header::new(object_storage::VALIDATION_ALGO);
|
||||
let header = jsonwebtoken::Header::new(endpoint_storage::VALIDATION_ALGO);
|
||||
jsonwebtoken::encode(&header, &claims, &key).unwrap()
|
||||
}
|
||||
|
||||
@@ -364,7 +364,10 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
|
||||
vec![TIMELINE_ID.to_string(), TimelineId::generate().to_string()],
|
||||
vec![ENDPOINT_ID, "ep-ololo"]
|
||||
)
|
||||
.skip(1);
|
||||
// first one is fully valid path, second path is valid for GET as
|
||||
// read paths may have different endpoint if tenant and timeline matches
|
||||
// (needed for prewarming RO->RW replica)
|
||||
.skip(2);
|
||||
|
||||
for ((uri, method), (tenant, timeline, endpoint)) in iproduct!(routes(), args) {
|
||||
info!(%uri, %method, %tenant, %timeline, %endpoint);
|
||||
@@ -475,6 +478,16 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
|
||||
requests_chain(chain.into_iter(), |_| token()).await;
|
||||
}
|
||||
|
||||
#[testlog(tokio::test)]
|
||||
async fn read_other_endpoint_data() {
|
||||
let uri = format!("/{TENANT_ID}/{TIMELINE_ID}/other_endpoint/key");
|
||||
let chain = vec![
|
||||
(uri.clone(), "GET", "", StatusCode::NOT_FOUND, false),
|
||||
(uri.clone(), "PUT", "", StatusCode::UNAUTHORIZED, false),
|
||||
];
|
||||
requests_chain(chain.into_iter(), |_| token()).await;
|
||||
}
|
||||
|
||||
fn delete_prefix_token(uri: &str) -> String {
|
||||
use serde::Serialize;
|
||||
let parts = uri.split("/").collect::<Vec<&str>>();
|
||||
@@ -482,7 +495,7 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
|
||||
struct PrefixClaims {
|
||||
tenant_id: TenantId,
|
||||
timeline_id: Option<TimelineId>,
|
||||
endpoint_id: Option<object_storage::EndpointId>,
|
||||
endpoint_id: Option<endpoint_storage::EndpointId>,
|
||||
exp: u64,
|
||||
}
|
||||
let claims = PrefixClaims {
|
||||
@@ -492,7 +505,7 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
|
||||
exp: u64::MAX,
|
||||
};
|
||||
let key = jsonwebtoken::EncodingKey::from_ed_pem(TEST_PRIV_KEY_ED25519).unwrap();
|
||||
let header = jsonwebtoken::Header::new(object_storage::VALIDATION_ALGO);
|
||||
let header = jsonwebtoken::Header::new(endpoint_storage::VALIDATION_ALGO);
|
||||
jsonwebtoken::encode(&header, &claims, &key).unwrap()
|
||||
}
|
||||
|
||||
@@ -169,10 +169,19 @@ impl FromRequestParts<Arc<Storage>> for S3Path {
|
||||
.auth
|
||||
.decode(bearer.token())
|
||||
.map_err(|e| bad_request(e, "decoding token"))?;
|
||||
|
||||
// Read paths may have different endpoint ids. For readonly -> readwrite replica
|
||||
// prewarming, endpoint must read other endpoint's data.
|
||||
let endpoint_id = if parts.method == axum::http::Method::GET {
|
||||
claims.endpoint_id.clone()
|
||||
} else {
|
||||
path.endpoint_id.clone()
|
||||
};
|
||||
|
||||
let route = Claims {
|
||||
tenant_id: path.tenant_id,
|
||||
timeline_id: path.timeline_id,
|
||||
endpoint_id: path.endpoint_id.clone(),
|
||||
endpoint_id,
|
||||
exp: claims.exp,
|
||||
};
|
||||
if route != claims {
|
||||
@@ -1,4 +1,4 @@
|
||||
//! `object_storage` is a service which provides API for uploading and downloading
|
||||
//! `endpoint_storage` is a service which provides API for uploading and downloading
|
||||
//! files. It is used by compute and control plane for accessing LFC prewarm data.
|
||||
//! This service is deployed either as a separate component or as part of compute image
|
||||
//! for large computes.
|
||||
@@ -33,7 +33,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
|
||||
let config: String = std::env::args().skip(1).take(1).collect();
|
||||
if config.is_empty() {
|
||||
anyhow::bail!("Usage: object_storage config.json")
|
||||
anyhow::bail!("Usage: endpoint_storage config.json")
|
||||
}
|
||||
info!("Reading config from {config}");
|
||||
let config = std::fs::read_to_string(config.clone())?;
|
||||
@@ -41,7 +41,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
info!("Reading pemfile from {}", config.pemfile.clone());
|
||||
let pemfile = std::fs::read(config.pemfile.clone())?;
|
||||
info!("Loading public key from {}", config.pemfile.clone());
|
||||
let auth = object_storage::JwtAuth::new(&pemfile)?;
|
||||
let auth = endpoint_storage::JwtAuth::new(&pemfile)?;
|
||||
|
||||
let listener = tokio::net::TcpListener::bind(config.listen).await.unwrap();
|
||||
info!("listening on {}", listener.local_addr().unwrap());
|
||||
@@ -50,7 +50,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
let cancel = tokio_util::sync::CancellationToken::new();
|
||||
app::check_storage_permissions(&storage, cancel.clone()).await?;
|
||||
|
||||
let proxy = std::sync::Arc::new(object_storage::Storage {
|
||||
let proxy = std::sync::Arc::new(endpoint_storage::Storage {
|
||||
auth,
|
||||
storage,
|
||||
cancel: cancel.clone(),
|
||||
@@ -242,13 +242,22 @@ impl RemoteExtSpec {
|
||||
|
||||
match self.extension_data.get(real_ext_name) {
|
||||
Some(_ext_data) => {
|
||||
// We have decided to use the Go naming convention due to Kubernetes.
|
||||
|
||||
let arch = match std::env::consts::ARCH {
|
||||
"x86_64" => "amd64",
|
||||
"aarch64" => "arm64",
|
||||
arch => arch,
|
||||
};
|
||||
|
||||
// Construct the path to the extension archive
|
||||
// BUILD_TAG/PG_MAJOR_VERSION/extensions/EXTENSION_NAME.tar.zst
|
||||
//
|
||||
// Keep it in sync with path generation in
|
||||
// https://github.com/neondatabase/build-custom-extensions/tree/main
|
||||
let archive_path_str =
|
||||
format!("{build_tag}/{pg_major_version}/extensions/{real_ext_name}.tar.zst");
|
||||
let archive_path_str = format!(
|
||||
"{build_tag}/{arch}/{pg_major_version}/extensions/{real_ext_name}.tar.zst"
|
||||
);
|
||||
Ok((
|
||||
real_ext_name.to_string(),
|
||||
RemotePath::from_string(&archive_path_str)?,
|
||||
|
||||
@@ -78,6 +78,7 @@ metrics.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
pageserver_client.workspace = true # for ResponseErrorMessageExt TOOD refactor that
|
||||
pageserver_compaction.workspace = true
|
||||
pem.workspace = true
|
||||
postgres_connection.workspace = true
|
||||
postgres_ffi.workspace = true
|
||||
pq_proto.workspace = true
|
||||
|
||||
@@ -68,6 +68,13 @@ pub(crate) struct Args {
|
||||
targets: Option<Vec<TenantTimelineId>>,
|
||||
}
|
||||
|
||||
/// State shared by all clients
|
||||
#[derive(Debug)]
|
||||
struct SharedState {
|
||||
start_work_barrier: tokio::sync::Barrier,
|
||||
live_stats: LiveStats,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct LiveStats {
|
||||
completed_requests: AtomicU64,
|
||||
@@ -240,24 +247,26 @@ async fn main_impl(
|
||||
all_ranges
|
||||
};
|
||||
|
||||
let live_stats = Arc::new(LiveStats::default());
|
||||
|
||||
let num_live_stats_dump = 1;
|
||||
let num_work_sender_tasks = args.num_clients.get() * timelines.len();
|
||||
let num_main_impl = 1;
|
||||
|
||||
let start_work_barrier = Arc::new(tokio::sync::Barrier::new(
|
||||
num_live_stats_dump + num_work_sender_tasks + num_main_impl,
|
||||
));
|
||||
let shared_state = Arc::new(SharedState {
|
||||
start_work_barrier: tokio::sync::Barrier::new(
|
||||
num_live_stats_dump + num_work_sender_tasks + num_main_impl,
|
||||
),
|
||||
live_stats: LiveStats::default(),
|
||||
});
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
let ss = shared_state.clone();
|
||||
tokio::spawn({
|
||||
let stats = Arc::clone(&live_stats);
|
||||
let start_work_barrier = Arc::clone(&start_work_barrier);
|
||||
async move {
|
||||
start_work_barrier.wait().await;
|
||||
ss.start_work_barrier.wait().await;
|
||||
loop {
|
||||
let start = std::time::Instant::now();
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
let stats = &ss.live_stats;
|
||||
let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed);
|
||||
let missed = stats.missed.swap(0, Ordering::Relaxed);
|
||||
let elapsed = start.elapsed();
|
||||
@@ -270,14 +279,12 @@ async fn main_impl(
|
||||
}
|
||||
});
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
let rps_period = args
|
||||
.per_client_rate
|
||||
.map(|rps_limit| Duration::from_secs_f64(1.0 / (rps_limit as f64)));
|
||||
let make_worker: &dyn Fn(WorkerId) -> Pin<Box<dyn Send + Future<Output = ()>>> = &|worker_id| {
|
||||
let live_stats = live_stats.clone();
|
||||
let start_work_barrier = start_work_barrier.clone();
|
||||
let ss = shared_state.clone();
|
||||
let cancel = cancel.clone();
|
||||
let ranges: Vec<KeyRange> = all_ranges
|
||||
.iter()
|
||||
.filter(|r| r.timeline == worker_id.timeline)
|
||||
@@ -287,85 +294,8 @@ async fn main_impl(
|
||||
rand::distributions::weighted::WeightedIndex::new(ranges.iter().map(|v| v.len()))
|
||||
.unwrap();
|
||||
|
||||
let cancel = cancel.clone();
|
||||
Box::pin(async move {
|
||||
let client =
|
||||
pageserver_client::page_service::Client::new(args.page_service_connstring.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
let mut client = client
|
||||
.pagestream(worker_id.timeline.tenant_id, worker_id.timeline.timeline_id)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
start_work_barrier.wait().await;
|
||||
let client_start = Instant::now();
|
||||
let mut ticks_processed = 0;
|
||||
let mut inflight = VecDeque::new();
|
||||
while !cancel.is_cancelled() {
|
||||
// Detect if a request took longer than the RPS rate
|
||||
if let Some(period) = &rps_period {
|
||||
let periods_passed_until_now =
|
||||
usize::try_from(client_start.elapsed().as_micros() / period.as_micros())
|
||||
.unwrap();
|
||||
|
||||
if periods_passed_until_now > ticks_processed {
|
||||
live_stats.missed((periods_passed_until_now - ticks_processed) as u64);
|
||||
}
|
||||
ticks_processed = periods_passed_until_now;
|
||||
}
|
||||
|
||||
while inflight.len() < args.queue_depth.get() {
|
||||
let start = Instant::now();
|
||||
let req = {
|
||||
let mut rng = rand::thread_rng();
|
||||
let r = &ranges[weights.sample(&mut rng)];
|
||||
let key: i128 = rng.gen_range(r.start..r.end);
|
||||
let key = Key::from_i128(key);
|
||||
assert!(key.is_rel_block_key());
|
||||
let (rel_tag, block_no) = key
|
||||
.to_rel_block()
|
||||
.expect("we filter non-rel-block keys out above");
|
||||
PagestreamGetPageRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid: 0,
|
||||
request_lsn: if rng.gen_bool(args.req_latest_probability) {
|
||||
Lsn::MAX
|
||||
} else {
|
||||
r.timeline_lsn
|
||||
},
|
||||
not_modified_since: r.timeline_lsn,
|
||||
},
|
||||
rel: rel_tag,
|
||||
blkno: block_no,
|
||||
}
|
||||
};
|
||||
client.getpage_send(req).await.unwrap();
|
||||
inflight.push_back(start);
|
||||
}
|
||||
|
||||
let start = inflight.pop_front().unwrap();
|
||||
client.getpage_recv().await.unwrap();
|
||||
let end = Instant::now();
|
||||
live_stats.request_done();
|
||||
ticks_processed += 1;
|
||||
STATS.with(|stats| {
|
||||
stats
|
||||
.borrow()
|
||||
.lock()
|
||||
.unwrap()
|
||||
.observe(end.duration_since(start))
|
||||
.unwrap();
|
||||
});
|
||||
|
||||
if let Some(period) = &rps_period {
|
||||
let next_at = client_start
|
||||
+ Duration::from_micros(
|
||||
(ticks_processed) as u64 * u64::try_from(period.as_micros()).unwrap(),
|
||||
);
|
||||
tokio::time::sleep_until(next_at.into()).await;
|
||||
}
|
||||
}
|
||||
client_libpq(args, worker_id, ss, cancel, rps_period, ranges, weights).await
|
||||
})
|
||||
};
|
||||
|
||||
@@ -387,7 +317,7 @@ async fn main_impl(
|
||||
};
|
||||
|
||||
info!("waiting for everything to become ready");
|
||||
start_work_barrier.wait().await;
|
||||
shared_state.start_work_barrier.wait().await;
|
||||
info!("work started");
|
||||
if let Some(runtime) = args.runtime {
|
||||
tokio::time::sleep(runtime.into()).await;
|
||||
@@ -416,3 +346,91 @@ async fn main_impl(
|
||||
|
||||
anyhow::Ok(())
|
||||
}
|
||||
|
||||
async fn client_libpq(
|
||||
args: &Args,
|
||||
worker_id: WorkerId,
|
||||
shared_state: Arc<SharedState>,
|
||||
cancel: CancellationToken,
|
||||
rps_period: Option<Duration>,
|
||||
ranges: Vec<KeyRange>,
|
||||
weights: rand::distributions::weighted::WeightedIndex<i128>,
|
||||
) {
|
||||
let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
let mut client = client
|
||||
.pagestream(worker_id.timeline.tenant_id, worker_id.timeline.timeline_id)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
shared_state.start_work_barrier.wait().await;
|
||||
let client_start = Instant::now();
|
||||
let mut ticks_processed = 0;
|
||||
let mut inflight = VecDeque::new();
|
||||
while !cancel.is_cancelled() {
|
||||
// Detect if a request took longer than the RPS rate
|
||||
if let Some(period) = &rps_period {
|
||||
let periods_passed_until_now =
|
||||
usize::try_from(client_start.elapsed().as_micros() / period.as_micros()).unwrap();
|
||||
|
||||
if periods_passed_until_now > ticks_processed {
|
||||
shared_state
|
||||
.live_stats
|
||||
.missed((periods_passed_until_now - ticks_processed) as u64);
|
||||
}
|
||||
ticks_processed = periods_passed_until_now;
|
||||
}
|
||||
|
||||
while inflight.len() < args.queue_depth.get() {
|
||||
let start = Instant::now();
|
||||
let req = {
|
||||
let mut rng = rand::thread_rng();
|
||||
let r = &ranges[weights.sample(&mut rng)];
|
||||
let key: i128 = rng.gen_range(r.start..r.end);
|
||||
let key = Key::from_i128(key);
|
||||
assert!(key.is_rel_block_key());
|
||||
let (rel_tag, block_no) = key
|
||||
.to_rel_block()
|
||||
.expect("we filter non-rel-block keys out above");
|
||||
PagestreamGetPageRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid: 0,
|
||||
request_lsn: if rng.gen_bool(args.req_latest_probability) {
|
||||
Lsn::MAX
|
||||
} else {
|
||||
r.timeline_lsn
|
||||
},
|
||||
not_modified_since: r.timeline_lsn,
|
||||
},
|
||||
rel: rel_tag,
|
||||
blkno: block_no,
|
||||
}
|
||||
};
|
||||
client.getpage_send(req).await.unwrap();
|
||||
inflight.push_back(start);
|
||||
}
|
||||
|
||||
let start = inflight.pop_front().unwrap();
|
||||
client.getpage_recv().await.unwrap();
|
||||
let end = Instant::now();
|
||||
shared_state.live_stats.request_done();
|
||||
ticks_processed += 1;
|
||||
STATS.with(|stats| {
|
||||
stats
|
||||
.borrow()
|
||||
.lock()
|
||||
.unwrap()
|
||||
.observe(end.duration_since(start))
|
||||
.unwrap();
|
||||
});
|
||||
|
||||
if let Some(period) = &rps_period {
|
||||
let next_at = client_start
|
||||
+ Duration::from_micros(
|
||||
(ticks_processed) as u64 * u64::try_from(period.as_micros()).unwrap(),
|
||||
);
|
||||
tokio::time::sleep_until(next_at.into()).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -416,8 +416,18 @@ fn start_pageserver(
|
||||
// The storage_broker::connect call needs to happen inside a tokio runtime thread.
|
||||
let broker_client = WALRECEIVER_RUNTIME
|
||||
.block_on(async {
|
||||
let tls_config = storage_broker::ClientTlsConfig::new().ca_certificates(
|
||||
conf.ssl_ca_certs
|
||||
.iter()
|
||||
.map(pem::encode)
|
||||
.map(storage_broker::Certificate::from_pem),
|
||||
);
|
||||
// Note: we do not attempt connecting here (but validate endpoints sanity).
|
||||
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)
|
||||
storage_broker::connect(
|
||||
conf.broker_endpoint.clone(),
|
||||
conf.broker_keepalive_interval,
|
||||
tls_config,
|
||||
)
|
||||
})
|
||||
.with_context(|| {
|
||||
format!(
|
||||
|
||||
@@ -17,9 +17,10 @@ use once_cell::sync::OnceCell;
|
||||
use pageserver_api::config::{DiskUsageEvictionTaskConfig, MaxVectoredReadBytes};
|
||||
use pageserver_api::models::ImageCompressionAlgorithm;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pem::Pem;
|
||||
use postgres_backend::AuthType;
|
||||
use remote_storage::{RemotePath, RemoteStorageConfig};
|
||||
use reqwest::{Certificate, Url};
|
||||
use reqwest::Url;
|
||||
use storage_broker::Uri;
|
||||
use utils::id::{NodeId, TimelineId};
|
||||
use utils::logging::{LogFormat, SecretString};
|
||||
@@ -67,8 +68,8 @@ pub struct PageServerConf {
|
||||
/// Period to reload certificate and private key from files.
|
||||
/// Default: 60s.
|
||||
pub ssl_cert_reload_period: Duration,
|
||||
/// Trusted root CA certificates to use in https APIs.
|
||||
pub ssl_ca_certs: Vec<Certificate>,
|
||||
/// Trusted root CA certificates to use in https APIs in PEM format.
|
||||
pub ssl_ca_certs: Vec<Pem>,
|
||||
|
||||
/// Current availability zone. Used for traffic metrics.
|
||||
pub availability_zone: Option<String>,
|
||||
@@ -497,7 +498,10 @@ impl PageServerConf {
|
||||
ssl_ca_certs: match ssl_ca_file {
|
||||
Some(ssl_ca_file) => {
|
||||
let buf = std::fs::read(ssl_ca_file)?;
|
||||
Certificate::from_pem_bundle(&buf)?
|
||||
pem::parse_many(&buf)?
|
||||
.into_iter()
|
||||
.filter(|pem| pem.tag() == "CERTIFICATE")
|
||||
.collect()
|
||||
}
|
||||
None => Vec::new(),
|
||||
},
|
||||
|
||||
@@ -263,7 +263,9 @@ where
|
||||
while let Some((tenant_id, tenant)) = tenants.next().await {
|
||||
let mut tenant_resident_size = 0;
|
||||
|
||||
for timeline in tenant.list_timelines() {
|
||||
let timelines = tenant.list_timelines();
|
||||
let timelines_len = timelines.len();
|
||||
for timeline in timelines {
|
||||
let timeline_id = timeline.timeline_id;
|
||||
|
||||
match TimelineSnapshot::collect(&timeline, ctx) {
|
||||
@@ -289,6 +291,11 @@ where
|
||||
tenant_resident_size += timeline.resident_physical_size();
|
||||
}
|
||||
|
||||
if timelines_len == 0 {
|
||||
// Force set it to 1 byte to avoid not being reported -- all timelines are offloaded.
|
||||
tenant_resident_size = 1;
|
||||
}
|
||||
|
||||
let snap = TenantSnapshot::collect(&tenant, tenant_resident_size);
|
||||
snap.to_metrics(tenant_id, Utc::now(), cache, &mut current_metrics);
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ use pageserver_api::upcall_api::{
|
||||
ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, ValidateRequest,
|
||||
ValidateRequestTenant, ValidateResponse,
|
||||
};
|
||||
use reqwest::Certificate;
|
||||
use serde::Serialize;
|
||||
use serde::de::DeserializeOwned;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -76,8 +77,8 @@ impl StorageControllerUpcallClient {
|
||||
client = client.default_headers(headers);
|
||||
}
|
||||
|
||||
for ssl_ca_cert in &conf.ssl_ca_certs {
|
||||
client = client.add_root_certificate(ssl_ca_cert.clone());
|
||||
for cert in &conf.ssl_ca_certs {
|
||||
client = client.add_root_certificate(Certificate::from_der(cert.contents())?);
|
||||
}
|
||||
|
||||
Ok(Some(Self {
|
||||
|
||||
@@ -1285,6 +1285,10 @@ impl Timeline {
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
|
||||
if query.is_empty() {
|
||||
return Ok(BTreeMap::default());
|
||||
}
|
||||
|
||||
let read_path = if self.conf.enable_read_path_debugging || ctx.read_path_debug() {
|
||||
Some(ReadPath::new(
|
||||
query.total_keyspace(),
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
//! FIXME: most of this is copy-paste from mgmt_api.rs ; dedupe into a `reqwest_utils::Client` crate.
|
||||
use pageserver_client::mgmt_api::{Error, ResponseErrorMessageExt};
|
||||
use reqwest::Method;
|
||||
use reqwest::{Certificate, Method};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::error;
|
||||
@@ -34,7 +34,7 @@ impl Client {
|
||||
};
|
||||
let mut http_client = reqwest::Client::builder();
|
||||
for cert in &conf.ssl_ca_certs {
|
||||
http_client = http_client.add_root_certificate(cert.clone());
|
||||
http_client = http_client.add_root_certificate(Certificate::from_der(cert.contents())?);
|
||||
}
|
||||
let http_client = http_client.build()?;
|
||||
|
||||
|
||||
@@ -803,7 +803,13 @@ neon_create(SMgrRelation reln, ForkNumber forkNum, bool isRedo)
|
||||
|
||||
case RELPERSISTENCE_TEMP:
|
||||
case RELPERSISTENCE_UNLOGGED:
|
||||
#ifdef DEBUG_COMPARE_LOCAL
|
||||
mdcreate(reln, forkNum, forkNum == INIT_FORKNUM || isRedo);
|
||||
if (forkNum == MAIN_FORKNUM)
|
||||
mdcreate(reln, INIT_FORKNUM, true);
|
||||
#else
|
||||
mdcreate(reln, forkNum, isRedo);
|
||||
#endif
|
||||
return;
|
||||
|
||||
default:
|
||||
@@ -1973,6 +1979,10 @@ neon_start_unlogged_build(SMgrRelation reln)
|
||||
case RELPERSISTENCE_UNLOGGED:
|
||||
unlogged_build_rel = reln;
|
||||
unlogged_build_phase = UNLOGGED_BUILD_NOT_PERMANENT;
|
||||
#ifdef DEBUG_COMPARE_LOCAL
|
||||
if (!IsParallelWorker())
|
||||
mdcreate(reln, INIT_FORKNUM, true);
|
||||
#endif
|
||||
return;
|
||||
|
||||
default:
|
||||
@@ -1995,12 +2005,14 @@ neon_start_unlogged_build(SMgrRelation reln)
|
||||
* FIXME: should we pass isRedo true to create the tablespace dir if it
|
||||
* doesn't exist? Is it needed?
|
||||
*/
|
||||
#ifndef DEBUG_COMPARE_LOCAL
|
||||
if (!IsParallelWorker())
|
||||
{
|
||||
#ifndef DEBUG_COMPARE_LOCAL
|
||||
mdcreate(reln, MAIN_FORKNUM, false);
|
||||
#else
|
||||
mdcreate(reln, INIT_FORKNUM, false);
|
||||
mdcreate(reln, INIT_FORKNUM, true);
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -2099,12 +2111,12 @@ neon_end_unlogged_build(SMgrRelation reln)
|
||||
#ifndef DEBUG_COMPARE_LOCAL
|
||||
/* use isRedo == true, so that we drop it immediately */
|
||||
mdunlink(rinfob, forknum, true);
|
||||
#else
|
||||
mdunlink(rinfob, INIT_FORKNUM, true);
|
||||
#endif
|
||||
}
|
||||
#ifdef DEBUG_COMPARE_LOCAL
|
||||
mdunlink(rinfob, INIT_FORKNUM, true);
|
||||
#endif
|
||||
}
|
||||
|
||||
unlogged_build_rel = NULL;
|
||||
unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
|
||||
}
|
||||
|
||||
@@ -91,6 +91,7 @@ mod jemalloc;
|
||||
mod logging;
|
||||
mod metrics;
|
||||
mod parse;
|
||||
mod pglb;
|
||||
mod protocol2;
|
||||
mod proxy;
|
||||
mod rate_limiter;
|
||||
|
||||
193
proxy/src/pglb/inprocess.rs
Normal file
193
proxy/src/pglb/inprocess.rs
Normal file
@@ -0,0 +1,193 @@
|
||||
#![allow(dead_code, reason = "TODO: work in progress")]
|
||||
|
||||
use std::pin::{Pin, pin};
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::task::{Context, Poll};
|
||||
use std::{fmt, io};
|
||||
|
||||
use tokio::io::{AsyncRead, AsyncWrite, DuplexStream, ReadBuf};
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
const STREAM_CHANNEL_SIZE: usize = 16;
|
||||
const MAX_STREAM_BUFFER_SIZE: usize = 4096;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Connection {
|
||||
stream_sender: mpsc::Sender<Stream>,
|
||||
stream_receiver: mpsc::Receiver<Stream>,
|
||||
stream_id_counter: Arc<AtomicUsize>,
|
||||
}
|
||||
|
||||
impl Connection {
|
||||
pub fn new() -> (Connection, Connection) {
|
||||
let (sender_a, receiver_a) = mpsc::channel(STREAM_CHANNEL_SIZE);
|
||||
let (sender_b, receiver_b) = mpsc::channel(STREAM_CHANNEL_SIZE);
|
||||
|
||||
let stream_id_counter = Arc::new(AtomicUsize::new(1));
|
||||
|
||||
let conn_a = Connection {
|
||||
stream_sender: sender_a,
|
||||
stream_receiver: receiver_b,
|
||||
stream_id_counter: Arc::clone(&stream_id_counter),
|
||||
};
|
||||
let conn_b = Connection {
|
||||
stream_sender: sender_b,
|
||||
stream_receiver: receiver_a,
|
||||
stream_id_counter,
|
||||
};
|
||||
|
||||
(conn_a, conn_b)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn next_stream_id(&self) -> StreamId {
|
||||
StreamId(self.stream_id_counter.fetch_add(1, Ordering::Relaxed))
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all, fields(stream_id = tracing::field::Empty, err))]
|
||||
pub async fn open_stream(&self) -> io::Result<Stream> {
|
||||
let (local, remote) = tokio::io::duplex(MAX_STREAM_BUFFER_SIZE);
|
||||
let stream_id = self.next_stream_id();
|
||||
tracing::Span::current().record("stream_id", stream_id.0);
|
||||
|
||||
let local = Stream {
|
||||
inner: local,
|
||||
id: stream_id,
|
||||
};
|
||||
let remote = Stream {
|
||||
inner: remote,
|
||||
id: stream_id,
|
||||
};
|
||||
|
||||
self.stream_sender
|
||||
.send(remote)
|
||||
.await
|
||||
.map_err(io::Error::other)?;
|
||||
|
||||
Ok(local)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all, fields(stream_id = tracing::field::Empty, err))]
|
||||
pub async fn accept_stream(&mut self) -> io::Result<Option<Stream>> {
|
||||
Ok(self.stream_receiver.recv().await.inspect(|stream| {
|
||||
tracing::Span::current().record("stream_id", stream.id.0);
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
pub struct StreamId(usize);
|
||||
|
||||
impl fmt::Display for StreamId {
|
||||
#[inline]
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "{}", self.0)
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Proper closing. Currently Streams can outlive their Connections.
|
||||
// Carry WeakSender and check strong_count?
|
||||
#[derive(Debug)]
|
||||
pub struct Stream {
|
||||
inner: DuplexStream,
|
||||
id: StreamId,
|
||||
}
|
||||
|
||||
impl Stream {
|
||||
#[inline]
|
||||
pub fn id(&self) -> StreamId {
|
||||
self.id
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncRead for Stream {
|
||||
#[tracing::instrument(level = "debug", skip_all, fields(stream_id = %self.id))]
|
||||
#[inline]
|
||||
fn poll_read(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut ReadBuf<'_>,
|
||||
) -> Poll<io::Result<()>> {
|
||||
pin!(&mut self.inner).poll_read(cx, buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncWrite for Stream {
|
||||
#[tracing::instrument(level = "debug", skip_all, fields(stream_id = %self.id))]
|
||||
#[inline]
|
||||
fn poll_write(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &[u8],
|
||||
) -> Poll<Result<usize, io::Error>> {
|
||||
pin!(&mut self.inner).poll_write(cx, buf)
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip_all, fields(stream_id = %self.id))]
|
||||
#[inline]
|
||||
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
|
||||
pin!(&mut self.inner).poll_flush(cx)
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip_all, fields(stream_id = %self.id))]
|
||||
#[inline]
|
||||
fn poll_shutdown(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Result<(), io::Error>> {
|
||||
pin!(&mut self.inner).poll_shutdown(cx)
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip_all, fields(stream_id = %self.id))]
|
||||
#[inline]
|
||||
fn poll_write_vectored(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
bufs: &[io::IoSlice<'_>],
|
||||
) -> Poll<Result<usize, io::Error>> {
|
||||
pin!(&mut self.inner).poll_write_vectored(cx, bufs)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn is_write_vectored(&self) -> bool {
|
||||
self.inner.is_write_vectored()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_simple_roundtrip() {
|
||||
let (client, mut server) = Connection::new();
|
||||
|
||||
let server_task = tokio::spawn(async move {
|
||||
while let Some(mut stream) = server.accept_stream().await.unwrap() {
|
||||
tokio::spawn(async move {
|
||||
let mut buf = [0; 64];
|
||||
loop {
|
||||
match stream.read(&mut buf).await.unwrap() {
|
||||
0 => break,
|
||||
n => stream.write(&buf[..n]).await.unwrap(),
|
||||
};
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
let mut stream = client.open_stream().await.unwrap();
|
||||
stream.write_all(b"hello!").await.unwrap();
|
||||
let mut buf = [0; 64];
|
||||
let n = stream.read(&mut buf).await.unwrap();
|
||||
assert_eq!(n, 6);
|
||||
assert_eq!(&buf[..n], b"hello!");
|
||||
|
||||
drop(stream);
|
||||
drop(client);
|
||||
server_task.await.unwrap();
|
||||
}
|
||||
}
|
||||
1
proxy/src/pglb/mod.rs
Normal file
1
proxy/src/pglb/mod.rs
Normal file
@@ -0,0 +1 @@
|
||||
pub mod inprocess;
|
||||
@@ -55,6 +55,7 @@ tokio-util = { workspace = true }
|
||||
tracing.workspace = true
|
||||
url.workspace = true
|
||||
metrics.workspace = true
|
||||
pem.workspace = true
|
||||
postgres_backend.workspace = true
|
||||
postgres_ffi.workspace = true
|
||||
pq_proto.workspace = true
|
||||
|
||||
@@ -14,9 +14,9 @@ use clap::{ArgAction, Parser};
|
||||
use futures::future::BoxFuture;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::{FutureExt, StreamExt};
|
||||
use http_utils::tls_certs::ReloadingCertificateResolver;
|
||||
use metrics::set_build_info_metric;
|
||||
use remote_storage::RemoteStorageConfig;
|
||||
use reqwest::Certificate;
|
||||
use safekeeper::defaults::{
|
||||
DEFAULT_CONTROL_FILE_SAVE_INTERVAL, DEFAULT_EVICTION_MIN_RESIDENT, DEFAULT_HEARTBEAT_TIMEOUT,
|
||||
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES, DEFAULT_PARTIAL_BACKUP_CONCURRENCY,
|
||||
@@ -24,8 +24,8 @@ use safekeeper::defaults::{
|
||||
DEFAULT_SSL_CERT_RELOAD_PERIOD, DEFAULT_SSL_KEY_FILE,
|
||||
};
|
||||
use safekeeper::{
|
||||
BROKER_RUNTIME, GlobalTimelines, HTTP_RUNTIME, SafeKeeperConf, WAL_SERVICE_RUNTIME, broker,
|
||||
control_file, http, wal_backup, wal_service,
|
||||
BACKGROUND_RUNTIME, BROKER_RUNTIME, GlobalTimelines, HTTP_RUNTIME, SafeKeeperConf,
|
||||
WAL_SERVICE_RUNTIME, broker, control_file, http, wal_backup, wal_service,
|
||||
};
|
||||
use sd_notify::NotifyState;
|
||||
use storage_broker::{DEFAULT_ENDPOINT, Uri};
|
||||
@@ -216,16 +216,21 @@ struct Args {
|
||||
ssl_cert_file: Utf8PathBuf,
|
||||
/// Period to reload certificate and private key from files.
|
||||
#[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_SSL_CERT_RELOAD_PERIOD)]
|
||||
pub ssl_cert_reload_period: Duration,
|
||||
ssl_cert_reload_period: Duration,
|
||||
/// Trusted root CA certificates to use in https APIs.
|
||||
#[arg(long)]
|
||||
pub ssl_ca_file: Option<Utf8PathBuf>,
|
||||
ssl_ca_file: Option<Utf8PathBuf>,
|
||||
/// Flag to use https for requests to peer's safekeeper API.
|
||||
#[arg(long)]
|
||||
pub use_https_safekeeper_api: bool,
|
||||
use_https_safekeeper_api: bool,
|
||||
/// Path to the JWT auth token used to authenticate with other safekeepers.
|
||||
#[arg(long)]
|
||||
auth_token_path: Option<Utf8PathBuf>,
|
||||
/// Enable TLS in WAL service API.
|
||||
/// Does not force TLS: the client negotiates TLS usage during the handshake.
|
||||
/// Uses key and certificate from ssl_key_file/ssl_cert_file.
|
||||
#[arg(long)]
|
||||
enable_tls_wal_service_api: bool,
|
||||
}
|
||||
|
||||
// Like PathBufValueParser, but allows empty string.
|
||||
@@ -373,7 +378,10 @@ async fn main() -> anyhow::Result<()> {
|
||||
Some(ssl_ca_file) => {
|
||||
tracing::info!("Using ssl root CA file: {ssl_ca_file:?}");
|
||||
let buf = tokio::fs::read(ssl_ca_file).await?;
|
||||
Certificate::from_pem_bundle(&buf)?
|
||||
pem::parse_many(&buf)?
|
||||
.into_iter()
|
||||
.filter(|pem| pem.tag() == "CERTIFICATE")
|
||||
.collect()
|
||||
}
|
||||
None => Vec::new(),
|
||||
};
|
||||
@@ -416,6 +424,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
ssl_cert_reload_period: args.ssl_cert_reload_period,
|
||||
ssl_ca_certs,
|
||||
use_https_safekeeper_api: args.use_https_safekeeper_api,
|
||||
enable_tls_wal_service_api: args.enable_tls_wal_service_api,
|
||||
});
|
||||
|
||||
// initialize sentry if SENTRY_DSN is provided
|
||||
@@ -515,6 +524,36 @@ async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
|
||||
info!("running in current thread runtime");
|
||||
}
|
||||
|
||||
let tls_server_config = if conf.listen_https_addr.is_some() || conf.enable_tls_wal_service_api {
|
||||
let ssl_key_file = conf.ssl_key_file.clone();
|
||||
let ssl_cert_file = conf.ssl_cert_file.clone();
|
||||
let ssl_cert_reload_period = conf.ssl_cert_reload_period;
|
||||
|
||||
// Create resolver in BACKGROUND_RUNTIME, so the background certificate reloading
|
||||
// task is run in this runtime.
|
||||
let cert_resolver = current_thread_rt
|
||||
.as_ref()
|
||||
.unwrap_or_else(|| BACKGROUND_RUNTIME.handle())
|
||||
.spawn(async move {
|
||||
ReloadingCertificateResolver::new(
|
||||
"main",
|
||||
&ssl_key_file,
|
||||
&ssl_cert_file,
|
||||
ssl_cert_reload_period,
|
||||
)
|
||||
.await
|
||||
})
|
||||
.await??;
|
||||
|
||||
let config = rustls::ServerConfig::builder()
|
||||
.with_no_client_auth()
|
||||
.with_cert_resolver(cert_resolver);
|
||||
|
||||
Some(Arc::new(config))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let wal_service_handle = current_thread_rt
|
||||
.as_ref()
|
||||
.unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle())
|
||||
@@ -522,6 +561,9 @@ async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
|
||||
conf.clone(),
|
||||
pg_listener,
|
||||
Scope::SafekeeperData,
|
||||
conf.enable_tls_wal_service_api
|
||||
.then(|| tls_server_config.clone())
|
||||
.flatten(),
|
||||
global_timelines.clone(),
|
||||
))
|
||||
// wrap with task name for error reporting
|
||||
@@ -550,6 +592,9 @@ async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
|
||||
conf.clone(),
|
||||
pg_listener_tenant_only,
|
||||
Scope::Tenant,
|
||||
conf.enable_tls_wal_service_api
|
||||
.then(|| tls_server_config.clone())
|
||||
.flatten(),
|
||||
global_timelines.clone(),
|
||||
))
|
||||
// wrap with task name for error reporting
|
||||
@@ -575,6 +620,7 @@ async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
|
||||
.spawn(http::task_main_https(
|
||||
conf.clone(),
|
||||
https_listener,
|
||||
tls_server_config.expect("tls_server_config is set earlier if https is enabled"),
|
||||
global_timelines.clone(),
|
||||
))
|
||||
.map(|res| ("HTTPS service main".to_owned(), res));
|
||||
|
||||
@@ -24,6 +24,15 @@ use crate::{GlobalTimelines, SafeKeeperConf};
|
||||
const RETRY_INTERVAL_MSEC: u64 = 1000;
|
||||
const PUSH_INTERVAL_MSEC: u64 = 1000;
|
||||
|
||||
fn make_tls_config(conf: &SafeKeeperConf) -> storage_broker::ClientTlsConfig {
|
||||
storage_broker::ClientTlsConfig::new().ca_certificates(
|
||||
conf.ssl_ca_certs
|
||||
.iter()
|
||||
.map(pem::encode)
|
||||
.map(storage_broker::Certificate::from_pem),
|
||||
)
|
||||
}
|
||||
|
||||
/// Push once in a while data about all active timelines to the broker.
|
||||
async fn push_loop(
|
||||
conf: Arc<SafeKeeperConf>,
|
||||
@@ -37,8 +46,11 @@ async fn push_loop(
|
||||
|
||||
let active_timelines_set = global_timelines.get_global_broker_active_set();
|
||||
|
||||
let mut client =
|
||||
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?;
|
||||
let mut client = storage_broker::connect(
|
||||
conf.broker_endpoint.clone(),
|
||||
conf.broker_keepalive_interval,
|
||||
make_tls_config(&conf),
|
||||
)?;
|
||||
let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC);
|
||||
|
||||
let outbound = async_stream::stream! {
|
||||
@@ -81,8 +93,11 @@ async fn pull_loop(
|
||||
global_timelines: Arc<GlobalTimelines>,
|
||||
stats: Arc<BrokerStats>,
|
||||
) -> Result<()> {
|
||||
let mut client =
|
||||
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?;
|
||||
let mut client = storage_broker::connect(
|
||||
conf.broker_endpoint.clone(),
|
||||
conf.broker_keepalive_interval,
|
||||
make_tls_config(&conf),
|
||||
)?;
|
||||
|
||||
// TODO: subscribe only to local timelines instead of all
|
||||
let request = SubscribeSafekeeperInfoRequest {
|
||||
@@ -134,8 +149,11 @@ async fn discover_loop(
|
||||
global_timelines: Arc<GlobalTimelines>,
|
||||
stats: Arc<BrokerStats>,
|
||||
) -> Result<()> {
|
||||
let mut client =
|
||||
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?;
|
||||
let mut client = storage_broker::connect(
|
||||
conf.broker_endpoint.clone(),
|
||||
conf.broker_keepalive_interval,
|
||||
make_tls_config(&conf),
|
||||
)?;
|
||||
|
||||
let request = SubscribeByFilterRequest {
|
||||
types: vec![TypeSubscription {
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
pub mod routes;
|
||||
use std::sync::Arc;
|
||||
|
||||
use http_utils::tls_certs::ReloadingCertificateResolver;
|
||||
pub use routes::make_router;
|
||||
pub use safekeeper_api::models;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -28,21 +27,10 @@ pub async fn task_main_http(
|
||||
pub async fn task_main_https(
|
||||
conf: Arc<SafeKeeperConf>,
|
||||
https_listener: std::net::TcpListener,
|
||||
tls_config: Arc<rustls::ServerConfig>,
|
||||
global_timelines: Arc<GlobalTimelines>,
|
||||
) -> anyhow::Result<()> {
|
||||
let cert_resolver = ReloadingCertificateResolver::new(
|
||||
"main",
|
||||
&conf.ssl_key_file,
|
||||
&conf.ssl_cert_file,
|
||||
conf.ssl_cert_reload_period,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let server_config = rustls::ServerConfig::builder()
|
||||
.with_no_client_auth()
|
||||
.with_cert_resolver(cert_resolver);
|
||||
|
||||
let tls_acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(server_config));
|
||||
let tls_acceptor = tokio_rustls::TlsAcceptor::from(tls_config);
|
||||
|
||||
let router = make_router(conf, global_timelines)
|
||||
.build()
|
||||
|
||||
@@ -14,6 +14,7 @@ use http_utils::json::{json_request, json_response};
|
||||
use http_utils::request::{ensure_no_body, parse_query_param, parse_request_param};
|
||||
use http_utils::{RequestExt, RouterBuilder};
|
||||
use hyper::{Body, Request, Response, StatusCode};
|
||||
use pem::Pem;
|
||||
use postgres_ffi::WAL_SEGMENT_SIZE;
|
||||
use safekeeper_api::models::{
|
||||
AcceptorStateStatus, PullTimelineRequest, SafekeeperStatus, SkTimelineInfo, TenantDeleteResult,
|
||||
@@ -230,14 +231,20 @@ async fn timeline_pull_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
let conf = get_conf(&request);
|
||||
let global_timelines = get_global_timelines(&request);
|
||||
|
||||
let resp = pull_timeline::handle_request(
|
||||
data,
|
||||
conf.sk_auth_token.clone(),
|
||||
conf.ssl_ca_certs.clone(),
|
||||
global_timelines,
|
||||
)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
let ca_certs = conf
|
||||
.ssl_ca_certs
|
||||
.iter()
|
||||
.map(Pem::contents)
|
||||
.map(reqwest::Certificate::from_der)
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
.map_err(|e| {
|
||||
ApiError::InternalServerError(anyhow::anyhow!("failed to parse CA certs: {e}"))
|
||||
})?;
|
||||
|
||||
let resp =
|
||||
pull_timeline::handle_request(data, conf.sk_auth_token.clone(), ca_certs, global_timelines)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
json_response(StatusCode::OK, resp)
|
||||
}
|
||||
|
||||
|
||||
@@ -6,8 +6,8 @@ use std::time::Duration;
|
||||
|
||||
use camino::Utf8PathBuf;
|
||||
use once_cell::sync::Lazy;
|
||||
use pem::Pem;
|
||||
use remote_storage::RemoteStorageConfig;
|
||||
use reqwest::Certificate;
|
||||
use storage_broker::Uri;
|
||||
use tokio::runtime::Runtime;
|
||||
use utils::auth::SwappableJwtAuth;
|
||||
@@ -120,8 +120,9 @@ pub struct SafeKeeperConf {
|
||||
pub ssl_key_file: Utf8PathBuf,
|
||||
pub ssl_cert_file: Utf8PathBuf,
|
||||
pub ssl_cert_reload_period: Duration,
|
||||
pub ssl_ca_certs: Vec<Certificate>,
|
||||
pub ssl_ca_certs: Vec<Pem>,
|
||||
pub use_https_safekeeper_api: bool,
|
||||
pub enable_tls_wal_service_api: bool,
|
||||
}
|
||||
|
||||
impl SafeKeeperConf {
|
||||
@@ -172,6 +173,7 @@ impl SafeKeeperConf {
|
||||
ssl_cert_reload_period: Duration::from_secs(60),
|
||||
ssl_ca_certs: Vec::new(),
|
||||
use_https_safekeeper_api: false,
|
||||
enable_tls_wal_service_api: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -209,3 +211,12 @@ pub static WAL_BACKUP_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
.build()
|
||||
.expect("Failed to create WAL backup runtime")
|
||||
});
|
||||
|
||||
pub static BACKGROUND_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.thread_name("background worker")
|
||||
.worker_threads(1) // there is only one task now (ssl certificate reloading), having more threads doesn't make sense
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("Failed to create background runtime")
|
||||
});
|
||||
|
||||
@@ -8,6 +8,7 @@ use std::time::SystemTime;
|
||||
use anyhow::{Context, bail};
|
||||
use futures::StreamExt;
|
||||
use postgres_protocol::message::backend::ReplicationMessage;
|
||||
use reqwest::Certificate;
|
||||
use safekeeper_api::Term;
|
||||
use safekeeper_api::membership::INVALID_GENERATION;
|
||||
use safekeeper_api::models::{PeerInfo, TimelineStatus};
|
||||
@@ -241,7 +242,7 @@ async fn recover(
|
||||
|
||||
let mut client = reqwest::Client::builder();
|
||||
for cert in &conf.ssl_ca_certs {
|
||||
client = client.add_root_certificate(cert.clone());
|
||||
client = client.add_root_certificate(Certificate::from_der(cert.contents())?);
|
||||
}
|
||||
let client = client
|
||||
.build()
|
||||
|
||||
@@ -29,6 +29,7 @@ pub async fn task_main(
|
||||
conf: Arc<SafeKeeperConf>,
|
||||
pg_listener: std::net::TcpListener,
|
||||
allowed_auth_scope: Scope,
|
||||
tls_config: Option<Arc<rustls::ServerConfig>>,
|
||||
global_timelines: Arc<GlobalTimelines>,
|
||||
) -> anyhow::Result<()> {
|
||||
// Tokio's from_std won't do this for us, per its comment.
|
||||
@@ -43,9 +44,10 @@ pub async fn task_main(
|
||||
let conf = conf.clone();
|
||||
let conn_id = issue_connection_id(&mut connection_count);
|
||||
let global_timelines = global_timelines.clone();
|
||||
let tls_config = tls_config.clone();
|
||||
tokio::spawn(
|
||||
async move {
|
||||
if let Err(err) = handle_socket(socket, conf, conn_id, allowed_auth_scope, global_timelines).await {
|
||||
if let Err(err) = handle_socket(socket, conf, conn_id, allowed_auth_scope, tls_config, global_timelines).await {
|
||||
error!("connection handler exited: {}", err);
|
||||
}
|
||||
}
|
||||
@@ -61,6 +63,7 @@ async fn handle_socket(
|
||||
conf: Arc<SafeKeeperConf>,
|
||||
conn_id: ConnectionId,
|
||||
allowed_auth_scope: Scope,
|
||||
tls_config: Option<Arc<rustls::ServerConfig>>,
|
||||
global_timelines: Arc<GlobalTimelines>,
|
||||
) -> Result<(), QueryError> {
|
||||
socket.set_nodelay(true)?;
|
||||
@@ -110,7 +113,8 @@ async fn handle_socket(
|
||||
auth_pair,
|
||||
global_timelines,
|
||||
);
|
||||
let pgbackend = PostgresBackend::new_from_io(socket_fd, socket, peer_addr, auth_type, None)?;
|
||||
let pgbackend =
|
||||
PostgresBackend::new_from_io(socket_fd, socket, peer_addr, auth_type, tls_config)?;
|
||||
// libpq protocol between safekeeper and walproposer / pageserver
|
||||
// We don't use shutdown.
|
||||
pgbackend
|
||||
|
||||
@@ -185,6 +185,7 @@ pub fn run_server(os: NodeOs, disk: Arc<SafekeeperDisk>) -> Result<()> {
|
||||
ssl_cert_reload_period: Duration::ZERO,
|
||||
ssl_ca_certs: Vec::new(),
|
||||
use_https_safekeeper_api: false,
|
||||
enable_tls_wal_service_api: false,
|
||||
};
|
||||
|
||||
let mut global = GlobalMap::new(disk, conf.clone())?;
|
||||
|
||||
@@ -87,7 +87,12 @@ fn tli_from_u64(i: u64) -> Vec<u8> {
|
||||
async fn subscribe(client: Option<BrokerClientChannel>, counter: Arc<AtomicU64>, i: u64) {
|
||||
let mut client = match client {
|
||||
Some(c) => c,
|
||||
None => storage_broker::connect(DEFAULT_ENDPOINT, Duration::from_secs(5)).unwrap(),
|
||||
None => storage_broker::connect(
|
||||
DEFAULT_ENDPOINT,
|
||||
Duration::from_secs(5),
|
||||
storage_broker::ClientTlsConfig::new(),
|
||||
)
|
||||
.unwrap(),
|
||||
};
|
||||
|
||||
let ttid = ProtoTenantTimelineId {
|
||||
@@ -119,7 +124,12 @@ async fn subscribe(client: Option<BrokerClientChannel>, counter: Arc<AtomicU64>,
|
||||
async fn publish(client: Option<BrokerClientChannel>, n_keys: u64) {
|
||||
let mut client = match client {
|
||||
Some(c) => c,
|
||||
None => storage_broker::connect(DEFAULT_ENDPOINT, Duration::from_secs(5)).unwrap(),
|
||||
None => storage_broker::connect(
|
||||
DEFAULT_ENDPOINT,
|
||||
Duration::from_secs(5),
|
||||
storage_broker::ClientTlsConfig::new(),
|
||||
)
|
||||
.unwrap(),
|
||||
};
|
||||
let mut counter: u64 = 0;
|
||||
|
||||
@@ -164,7 +174,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
}
|
||||
let h = tokio::spawn(progress_reporter(counters.clone()));
|
||||
|
||||
let c = storage_broker::connect(DEFAULT_ENDPOINT, Duration::from_secs(5)).unwrap();
|
||||
let c = storage_broker::connect(
|
||||
DEFAULT_ENDPOINT,
|
||||
Duration::from_secs(5),
|
||||
storage_broker::ClientTlsConfig::new(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
for i in 0..args.num_subs {
|
||||
let c = Some(c.clone());
|
||||
|
||||
@@ -4,7 +4,7 @@ use proto::TenantTimelineId as ProtoTenantTimelineId;
|
||||
use proto::broker_service_client::BrokerServiceClient;
|
||||
use tonic::Status;
|
||||
use tonic::codegen::StdError;
|
||||
use tonic::transport::{Channel, ClientTlsConfig, Endpoint};
|
||||
use tonic::transport::{Channel, Endpoint};
|
||||
use utils::id::{TenantId, TenantTimelineId, TimelineId};
|
||||
|
||||
// Code generated by protobuf.
|
||||
@@ -20,6 +20,7 @@ pub mod metrics;
|
||||
|
||||
// Re-exports to avoid direct tonic dependency in user crates.
|
||||
pub use hyper::Uri;
|
||||
pub use tonic::transport::{Certificate, ClientTlsConfig};
|
||||
pub use tonic::{Code, Request, Streaming};
|
||||
|
||||
pub const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:50051";
|
||||
@@ -38,7 +39,11 @@ pub type BrokerClientChannel = BrokerServiceClient<Channel>;
|
||||
//
|
||||
// NB: this function is not async, but still must be run on a tokio runtime thread
|
||||
// because that's a requirement of tonic_endpoint.connect_lazy()'s Channel::new call.
|
||||
pub fn connect<U>(endpoint: U, keepalive_interval: Duration) -> anyhow::Result<BrokerClientChannel>
|
||||
pub fn connect<U>(
|
||||
endpoint: U,
|
||||
keepalive_interval: Duration,
|
||||
tls_config: ClientTlsConfig,
|
||||
) -> anyhow::Result<BrokerClientChannel>
|
||||
where
|
||||
U: std::convert::TryInto<Uri>,
|
||||
U::Error: std::error::Error + Send + Sync + 'static,
|
||||
@@ -54,8 +59,7 @@ where
|
||||
rustls::crypto::ring::default_provider()
|
||||
.install_default()
|
||||
.ok();
|
||||
let tls = ClientTlsConfig::new();
|
||||
tonic_endpoint = tonic_endpoint.tls_config(tls)?;
|
||||
tonic_endpoint = tonic_endpoint.tls_config(tls_config)?;
|
||||
}
|
||||
tonic_endpoint = tonic_endpoint
|
||||
.http2_keep_alive_interval(keepalive_interval)
|
||||
|
||||
@@ -151,11 +151,39 @@ impl Service {
|
||||
"Got {} non-successful responses from initial creation request of total {total_result_count} responses",
|
||||
remaining.len()
|
||||
);
|
||||
if remaining.len() >= 2 {
|
||||
let target_sk_count = timeline_persistence.sk_set.len();
|
||||
let quorum_size = match target_sk_count {
|
||||
0 => {
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"timeline configured without any safekeepers",
|
||||
)));
|
||||
}
|
||||
1 | 2 => {
|
||||
#[cfg(feature = "testing")]
|
||||
{
|
||||
// In test settings, it is allowed to have one or two safekeepers
|
||||
target_sk_count
|
||||
}
|
||||
#[cfg(not(feature = "testing"))]
|
||||
{
|
||||
// The region is misconfigured: we need at least three safekeepers to be configured
|
||||
// in order to schedule work to them
|
||||
tracing::warn!(
|
||||
"couldn't find at least 3 safekeepers for timeline, found: {:?}",
|
||||
timeline_persistence.sk_set
|
||||
);
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"couldn't find at least 3 safekeepers to put timeline to"
|
||||
)));
|
||||
}
|
||||
}
|
||||
_ => target_sk_count / 2 + 1,
|
||||
};
|
||||
let success_count = target_sk_count - remaining.len();
|
||||
if success_count < quorum_size {
|
||||
// Failure
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"not enough successful reconciliations to reach quorum, please retry: {} errored",
|
||||
remaining.len()
|
||||
"not enough successful reconciliations to reach quorum size: {success_count} of {quorum_size} of total {target_sk_count}"
|
||||
)));
|
||||
}
|
||||
|
||||
@@ -492,8 +520,6 @@ impl Service {
|
||||
pub(crate) async fn safekeepers_for_new_timeline(
|
||||
&self,
|
||||
) -> Result<Vec<SafekeeperInfo>, ApiError> {
|
||||
// Number of safekeepers in different AZs we are looking for
|
||||
let wanted_count = 3;
|
||||
let mut all_safekeepers = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
locked
|
||||
@@ -532,6 +558,19 @@ impl Service {
|
||||
sk.1.id.0,
|
||||
)
|
||||
});
|
||||
// Number of safekeepers in different AZs we are looking for
|
||||
let wanted_count = match all_safekeepers.len() {
|
||||
0 => {
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"couldn't find any active safekeeper for new timeline",
|
||||
)));
|
||||
}
|
||||
// Have laxer requirements on testig mode as we don't want to
|
||||
// spin up three safekeepers for every single test
|
||||
#[cfg(feature = "testing")]
|
||||
1 | 2 => all_safekeepers.len(),
|
||||
_ => 3,
|
||||
};
|
||||
let mut sks = Vec::new();
|
||||
let mut azs = HashSet::new();
|
||||
for (_sk_util, sk_info, az_id) in all_safekeepers.iter() {
|
||||
|
||||
@@ -417,14 +417,14 @@ class NeonLocalCli(AbstractNeonCli):
|
||||
cmd.append(f"--instance-id={instance_id}")
|
||||
return self.raw_cli(cmd)
|
||||
|
||||
def object_storage_start(self, timeout_in_seconds: int | None = None):
|
||||
cmd = ["object-storage", "start"]
|
||||
def endpoint_storage_start(self, timeout_in_seconds: int | None = None):
|
||||
cmd = ["endpoint-storage", "start"]
|
||||
if timeout_in_seconds is not None:
|
||||
cmd.append(f"--start-timeout={timeout_in_seconds}s")
|
||||
return self.raw_cli(cmd)
|
||||
|
||||
def object_storage_stop(self, immediate: bool):
|
||||
cmd = ["object-storage", "stop"]
|
||||
def endpoint_storage_stop(self, immediate: bool):
|
||||
cmd = ["endpoint-storage", "stop"]
|
||||
if immediate:
|
||||
cmd.extend(["-m", "immediate"])
|
||||
return self.raw_cli(cmd)
|
||||
|
||||
@@ -1029,7 +1029,7 @@ class NeonEnvBuilder:
|
||||
|
||||
self.env.broker.assert_no_errors()
|
||||
|
||||
self.env.object_storage.assert_no_errors()
|
||||
self.env.endpoint_storage.assert_no_errors()
|
||||
|
||||
try:
|
||||
self.overlay_cleanup_teardown()
|
||||
@@ -1126,7 +1126,7 @@ class NeonEnv:
|
||||
pagectl_env_vars["RUST_LOG"] = self.rust_log_override
|
||||
self.pagectl = Pagectl(extra_env=pagectl_env_vars, binpath=self.neon_binpath)
|
||||
|
||||
self.object_storage = ObjectStorage(self)
|
||||
self.endpoint_storage = EndpointStorage(self)
|
||||
|
||||
# The URL for the pageserver to use as its control_plane_api config
|
||||
if config.storage_controller_port_override is not None:
|
||||
@@ -1183,7 +1183,7 @@ class NeonEnv:
|
||||
},
|
||||
"safekeepers": [],
|
||||
"pageservers": [],
|
||||
"object_storage": {"port": self.port_distributor.get_port()},
|
||||
"endpoint_storage": {"port": self.port_distributor.get_port()},
|
||||
"generate_local_ssl_certs": self.generate_local_ssl_certs,
|
||||
}
|
||||
|
||||
@@ -1420,7 +1420,7 @@ class NeonEnv:
|
||||
self.storage_controller.on_safekeeper_deploy(sk_id, body)
|
||||
self.storage_controller.safekeeper_scheduling_policy(sk_id, "Active")
|
||||
|
||||
self.object_storage.start(timeout_in_seconds=timeout_in_seconds)
|
||||
self.endpoint_storage.start(timeout_in_seconds=timeout_in_seconds)
|
||||
|
||||
def stop(self, immediate=False, ps_assert_metric_no_errors=False, fail_on_endpoint_errors=True):
|
||||
"""
|
||||
@@ -1439,7 +1439,7 @@ class NeonEnv:
|
||||
except Exception as e:
|
||||
raise_later = e
|
||||
|
||||
self.object_storage.stop(immediate=immediate)
|
||||
self.endpoint_storage.stop(immediate=immediate)
|
||||
|
||||
# Stop storage controller before pageservers: we don't want it to spuriously
|
||||
# detect a pageserver "failure" during test teardown
|
||||
@@ -2660,24 +2660,24 @@ class NeonStorageController(MetricsGetter, LogUtils):
|
||||
self.stop(immediate=True)
|
||||
|
||||
|
||||
class ObjectStorage(LogUtils):
|
||||
class EndpointStorage(LogUtils):
|
||||
def __init__(self, env: NeonEnv):
|
||||
service_dir = env.repo_dir / "object_storage"
|
||||
super().__init__(logfile=service_dir / "object_storage.log")
|
||||
self.conf_path = service_dir / "object_storage.json"
|
||||
service_dir = env.repo_dir / "endpoint_storage"
|
||||
super().__init__(logfile=service_dir / "endpoint_storage.log")
|
||||
self.conf_path = service_dir / "endpoint_storage.json"
|
||||
self.env = env
|
||||
|
||||
def base_url(self):
|
||||
return json.loads(self.conf_path.read_text())["listen"]
|
||||
|
||||
def start(self, timeout_in_seconds: int | None = None):
|
||||
self.env.neon_cli.object_storage_start(timeout_in_seconds)
|
||||
self.env.neon_cli.endpoint_storage_start(timeout_in_seconds)
|
||||
|
||||
def stop(self, immediate: bool = False):
|
||||
self.env.neon_cli.object_storage_stop(immediate)
|
||||
self.env.neon_cli.endpoint_storage_stop(immediate)
|
||||
|
||||
def assert_no_errors(self):
|
||||
assert_no_errors(self.logfile, "object_storage", [])
|
||||
assert_no_errors(self.logfile, "endpoint_storage", [])
|
||||
|
||||
|
||||
class NeonProxiedStorageController(NeonStorageController):
|
||||
|
||||
@@ -65,7 +65,7 @@ def test_ro_replica_lag(
|
||||
project = neon_api.create_project(pg_version)
|
||||
project_id = project["project"]["id"]
|
||||
log.info("Project ID: %s", project_id)
|
||||
log.info("Primary endpoint ID: %s", project["project"]["endpoints"][0]["id"])
|
||||
log.info("Primary endpoint ID: %s", project["endpoints"][0]["id"])
|
||||
neon_api.wait_for_operation_to_finish(project_id)
|
||||
error_occurred = False
|
||||
try:
|
||||
@@ -198,7 +198,7 @@ def test_replication_start_stop(
|
||||
project = neon_api.create_project(pg_version)
|
||||
project_id = project["project"]["id"]
|
||||
log.info("Project ID: %s", project_id)
|
||||
log.info("Primary endpoint ID: %s", project["project"]["endpoints"][0]["id"])
|
||||
log.info("Primary endpoint ID: %s", project["endpoints"][0]["id"])
|
||||
neon_api.wait_for_operation_to_finish(project_id)
|
||||
try:
|
||||
branch_id = project["branch"]["id"]
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import platform
|
||||
import shutil
|
||||
import tarfile
|
||||
from typing import TYPE_CHECKING
|
||||
@@ -58,7 +59,18 @@ def test_remote_extensions(
|
||||
extensions_endpoint = f"http://{host}:{port}/pg-ext-s3-gateway"
|
||||
|
||||
build_tag = os.environ.get("BUILD_TAG", "latest")
|
||||
archive_route = f"{build_tag}/v{pg_version}/extensions/test_extension.tar.zst"
|
||||
|
||||
# We have decided to use the Go naming convention due to Kubernetes.
|
||||
arch = platform.machine()
|
||||
match arch:
|
||||
case "aarch64":
|
||||
arch = "arm64"
|
||||
case "x86_64":
|
||||
arch = "amd64"
|
||||
case _:
|
||||
pass
|
||||
|
||||
archive_route = f"{build_tag}/{arch}/v{pg_version}/extensions/test_extension.tar.zst"
|
||||
tarball = test_output_dir / "test_extension.tar"
|
||||
extension_dir = (
|
||||
base_dir / "test_runner" / "regress" / "data" / "test_remote_extensions" / "test_extension"
|
||||
|
||||
@@ -8,7 +8,7 @@ from jwcrypto import jwk, jwt
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_object_storage_insert_retrieve_delete(neon_simple_env: NeonEnv):
|
||||
async def test_endpoint_storage_insert_retrieve_delete(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Inserts, retrieves, and deletes test file using a JWT token
|
||||
"""
|
||||
@@ -31,7 +31,7 @@ async def test_object_storage_insert_retrieve_delete(neon_simple_env: NeonEnv):
|
||||
token.make_signed_token(key)
|
||||
token = token.serialize()
|
||||
|
||||
base_url = env.object_storage.base_url()
|
||||
base_url = env.endpoint_storage.base_url()
|
||||
key = f"http://{base_url}/{tenant_id}/{timeline_id}/{endpoint_id}/key"
|
||||
headers = {"Authorization": f"Bearer {token}"}
|
||||
log.info(f"cache key url {key}")
|
||||
@@ -138,7 +138,7 @@ def test_cli_start_stop(neon_env_builder: NeonEnvBuilder):
|
||||
env.neon_cli.pageserver_stop(env.pageserver.id)
|
||||
env.neon_cli.safekeeper_stop()
|
||||
env.neon_cli.storage_controller_stop(False)
|
||||
env.neon_cli.object_storage_stop(False)
|
||||
env.neon_cli.endpoint_storage_stop(False)
|
||||
env.neon_cli.storage_broker_stop()
|
||||
|
||||
# Keep NeonEnv state up to date, it usually owns starting/stopping services
|
||||
@@ -185,7 +185,7 @@ def test_cli_start_stop_multi(neon_env_builder: NeonEnvBuilder):
|
||||
env.neon_cli.safekeeper_stop(neon_env_builder.safekeepers_id_start + 1)
|
||||
env.neon_cli.safekeeper_stop(neon_env_builder.safekeepers_id_start + 2)
|
||||
|
||||
env.neon_cli.object_storage_stop(False)
|
||||
env.neon_cli.endpoint_storage_stop(False)
|
||||
|
||||
# Stop this to get out of the way of the following `start`
|
||||
env.neon_cli.storage_controller_stop(False)
|
||||
|
||||
@@ -95,7 +95,7 @@ def test_storage_controller_smoke(
|
||||
env.pageservers[1].start()
|
||||
for sk in env.safekeepers:
|
||||
sk.start()
|
||||
env.object_storage.start()
|
||||
env.endpoint_storage.start()
|
||||
|
||||
# The pageservers we started should have registered with the sharding service on startup
|
||||
nodes = env.storage_controller.node_list()
|
||||
@@ -347,7 +347,7 @@ def prepare_onboarding_env(
|
||||
env = neon_env_builder.init_configs()
|
||||
env.broker.start()
|
||||
env.storage_controller.start()
|
||||
env.object_storage.start()
|
||||
env.endpoint_storage.start()
|
||||
|
||||
# This is the pageserver where we'll initially create the tenant. Run it in emergency
|
||||
# mode so that it doesn't talk to storage controller, and do not register it.
|
||||
@@ -1612,16 +1612,18 @@ def test_storage_controller_heartbeats(
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
# Default log allow list permits connection errors, but this test will use error responses on
|
||||
# the utilization endpoint.
|
||||
env.storage_controller.allowed_errors.append(
|
||||
".*Call to node.*management API.*failed.*failpoint.*"
|
||||
)
|
||||
# The server starts listening to the socket before sending re-attach request,
|
||||
# but it starts serving HTTP only when re-attach is completed.
|
||||
# If re-attach is slow (last scenario), storcon's heartbeat requests will time out.
|
||||
env.storage_controller.allowed_errors.append(
|
||||
".*Call to node.*management API.*failed.* Timeout.*"
|
||||
env.storage_controller.allowed_errors.extend(
|
||||
[
|
||||
# Default log allow list permits connection errors, but this test will use error responses on
|
||||
# the utilization endpoint.
|
||||
".*Call to node.*management API.*failed.*failpoint.*",
|
||||
# The server starts listening to the socket before sending re-attach request,
|
||||
# but it starts serving HTTP only when re-attach is completed.
|
||||
# If re-attach is slow (last scenario), storcon's heartbeat requests will time out.
|
||||
".*Call to node.*management API.*failed.* Timeout.*",
|
||||
# We will intentionally cause reconcile errors
|
||||
".*Reconcile error.*",
|
||||
]
|
||||
)
|
||||
|
||||
# Initially we have two online pageservers
|
||||
@@ -4240,6 +4242,63 @@ def test_storcon_create_delete_sk_down(
|
||||
wait_until(timeline_deleted_on_sk)
|
||||
|
||||
|
||||
@run_only_on_default_postgres("PG version is not interesting here")
|
||||
@pytest.mark.parametrize("num_safekeepers", [1, 2, 3])
|
||||
@pytest.mark.parametrize("deletetion_subject", [DeletionSubject.TENANT, DeletionSubject.TIMELINE])
|
||||
def test_storcon_few_sk(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
num_safekeepers: int,
|
||||
deletetion_subject: DeletionSubject,
|
||||
):
|
||||
"""
|
||||
Test that the storcon can create and delete tenants and timelines with a limited/special number of safekeepers
|
||||
- num_safekeepers: number of safekeepers.
|
||||
- deletion_subject: test that both single timeline and whole tenant deletion work.
|
||||
"""
|
||||
|
||||
neon_env_builder.num_safekeepers = num_safekeepers
|
||||
safekeeper_list = list(range(1, num_safekeepers + 1))
|
||||
neon_env_builder.storage_controller_config = {
|
||||
"timelines_onto_safekeepers": True,
|
||||
}
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
tenant_id = TenantId.generate()
|
||||
timeline_id = TimelineId.generate()
|
||||
env.create_tenant(tenant_id, timeline_id)
|
||||
child_timeline_id = env.create_branch("child_of_main", tenant_id)
|
||||
|
||||
env.safekeepers[0].assert_log_contains(f"creating new timeline {tenant_id}/{timeline_id}")
|
||||
|
||||
config_lines = [
|
||||
"neon.safekeeper_proto_version = 3",
|
||||
]
|
||||
with env.endpoints.create("main", tenant_id=tenant_id, config_lines=config_lines) as ep:
|
||||
# endpoint should start.
|
||||
ep.start(safekeeper_generation=1, safekeepers=safekeeper_list)
|
||||
ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)")
|
||||
|
||||
with env.endpoints.create(
|
||||
"child_of_main", tenant_id=tenant_id, config_lines=config_lines
|
||||
) as ep:
|
||||
# endpoint should start.
|
||||
ep.start(safekeeper_generation=1, safekeepers=safekeeper_list)
|
||||
ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)")
|
||||
|
||||
if deletetion_subject is DeletionSubject.TENANT:
|
||||
env.storage_controller.pageserver_api().tenant_delete(tenant_id)
|
||||
else:
|
||||
env.storage_controller.pageserver_api().timeline_delete(tenant_id, child_timeline_id)
|
||||
|
||||
# ensure that there is log msgs for the third safekeeper too
|
||||
def timeline_deleted_on_sk():
|
||||
env.safekeepers[0].assert_log_contains(
|
||||
f"deleting timeline {tenant_id}/{child_timeline_id} from disk"
|
||||
)
|
||||
|
||||
wait_until(timeline_deleted_on_sk)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("wrong_az", [True, False])
|
||||
def test_storage_controller_graceful_migration(neon_env_builder: NeonEnvBuilder, wrong_az: bool):
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user