Compare commits

..

5 Commits

Author SHA1 Message Date
Conrad Ludgate
9cffb16463 abstract byte handling 2025-05-13 14:45:21 +01:00
Conrad Ludgate
b2e0ab5dc6 cleanup 2025-05-12 18:16:15 +01:00
Conrad Ludgate
c55742b437 embed into conn tracking into copy_bidirectional 2025-05-12 18:01:52 +01:00
Folke Behrens
431a12acba proxy/conntrack: Global connection tracking table and debug logging 2025-04-21 20:02:10 +02:00
Folke Behrens
fd07ecf58f proxy/conntrack: Add mechanics to track connection state 2025-04-21 19:54:01 +02:00
45 changed files with 895 additions and 719 deletions

View File

@@ -19,7 +19,7 @@
!pageserver/
!pgxn/
!proxy/
!endpoint_storage/
!object_storage/
!storage_scrubber/
!safekeeper/
!storage_broker/

View File

@@ -133,7 +133,6 @@ runs:
fi
PERF_REPORT_DIR="$(realpath test_runner/perf-report-local)"
echo "PERF_REPORT_DIR=${PERF_REPORT_DIR}" >> ${GITHUB_ENV}
rm -rf $PERF_REPORT_DIR
TEST_SELECTION="test_runner/${{ inputs.test_selection }}"
@@ -210,12 +209,11 @@ runs:
--verbose \
-rA $TEST_SELECTION $EXTRA_PARAMS
- name: Upload performance report
if: ${{ !cancelled() && inputs.save_perf_report == 'true' }}
shell: bash -euxo pipefail {0}
run: |
export REPORT_FROM="${PERF_REPORT_DIR}"
scripts/generate_and_push_perf_report.sh
if [[ "${{ inputs.save_perf_report }}" == "true" ]]; then
export REPORT_FROM="$PERF_REPORT_DIR"
export REPORT_TO="$PLATFORM"
scripts/generate_and_push_perf_report.sh
fi
- name: Upload compatibility snapshot
# Note, that we use `github.base_ref` which is a target branch for a PR

54
Cargo.lock generated
View File

@@ -2037,33 +2037,6 @@ dependencies = [
"zeroize",
]
[[package]]
name = "endpoint_storage"
version = "0.0.1"
dependencies = [
"anyhow",
"axum",
"axum-extra",
"camino",
"camino-tempfile",
"futures",
"http-body-util",
"itertools 0.10.5",
"jsonwebtoken",
"prometheus",
"rand 0.8.5",
"remote_storage",
"serde",
"serde_json",
"test-log",
"tokio",
"tokio-util",
"tower 0.5.2",
"tracing",
"utils",
"workspace_hack",
]
[[package]]
name = "enum-map"
version = "2.5.0"
@@ -4025,6 +3998,33 @@ dependencies = [
"memchr",
]
[[package]]
name = "object_storage"
version = "0.0.1"
dependencies = [
"anyhow",
"axum",
"axum-extra",
"camino",
"camino-tempfile",
"futures",
"http-body-util",
"itertools 0.10.5",
"jsonwebtoken",
"prometheus",
"rand 0.8.5",
"remote_storage",
"serde",
"serde_json",
"test-log",
"tokio",
"tokio-util",
"tower 0.5.2",
"tracing",
"utils",
"workspace_hack",
]
[[package]]
name = "once_cell"
version = "1.20.2"

View File

@@ -40,7 +40,7 @@ members = [
"libs/proxy/postgres-protocol2",
"libs/proxy/postgres-types2",
"libs/proxy/tokio-postgres2",
"endpoint_storage",
"object_storage",
]
[workspace.package]

View File

@@ -89,7 +89,7 @@ RUN set -e \
--bin storage_broker \
--bin storage_controller \
--bin proxy \
--bin endpoint_storage \
--bin object_storage \
--bin neon_local \
--bin storage_scrubber \
--locked --release
@@ -122,7 +122,7 @@ COPY --from=build --chown=neon:neon /home/nonroot/target/release/safekeeper
COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_broker /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_controller /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/proxy /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/endpoint_storage /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/object_storage /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/neon_local /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_scrubber /usr/local/bin

View File

@@ -1677,7 +1677,7 @@ RUN set -e \
&& apt clean && rm -rf /var/lib/apt/lists/*
# Use `dist_man_MANS=` to skip manpage generation (which requires python3/pandoc)
ENV PGBOUNCER_TAG=pgbouncer_1_24_1
ENV PGBOUNCER_TAG=pgbouncer_1_22_1
RUN set -e \
&& git clone --recurse-submodules --depth 1 --branch ${PGBOUNCER_TAG} https://github.com/pgbouncer/pgbouncer.git pgbouncer \
&& cd pgbouncer \

View File

@@ -11,14 +11,6 @@ index bf6edcb..89b4c7f 100644
USE_PGXS = 1 # use pgxs if not in contrib directory
PGXS := $(shell $(PG_CONFIG) --pgxs)
diff --git a/regress/expected/init-extension.out b/regress/expected/init-extension.out
index 9f2e171..f6e4f8d 100644
--- a/regress/expected/init-extension.out
+++ b/regress/expected/init-extension.out
@@ -1,3 +1,2 @@
SET client_min_messages = warning;
CREATE EXTENSION pg_repack;
-RESET client_min_messages;
diff --git a/regress/expected/nosuper.out b/regress/expected/nosuper.out
index 8d0a94e..63b68bf 100644
--- a/regress/expected/nosuper.out
@@ -50,14 +42,6 @@ index 8d0a94e..63b68bf 100644
INFO: repacking table "public.tbl_cluster"
ERROR: query failed: ERROR: current transaction is aborted, commands ignored until end of transaction block
DETAIL: query was: RESET lock_timeout
diff --git a/regress/sql/init-extension.sql b/regress/sql/init-extension.sql
index 9f2e171..f6e4f8d 100644
--- a/regress/sql/init-extension.sql
+++ b/regress/sql/init-extension.sql
@@ -1,3 +1,2 @@
SET client_min_messages = warning;
CREATE EXTENSION pg_repack;
-RESET client_min_messages;
diff --git a/regress/sql/nosuper.sql b/regress/sql/nosuper.sql
index 072f0fa..dbe60f8 100644
--- a/regress/sql/nosuper.sql

View File

@@ -18,11 +18,12 @@ use anyhow::{Context, Result, anyhow, bail};
use clap::Parser;
use compute_api::spec::ComputeMode;
use control_plane::endpoint::ComputeControlPlane;
use control_plane::endpoint_storage::{ENDPOINT_STORAGE_DEFAULT_PORT, EndpointStorage};
use control_plane::local_env::{
EndpointStorageConf, InitForceMode, LocalEnv, NeonBroker, NeonLocalInitConf,
NeonLocalInitPageserverConf, SafekeeperConf,
InitForceMode, LocalEnv, NeonBroker, NeonLocalInitConf, NeonLocalInitPageserverConf,
ObjectStorageConf, SafekeeperConf,
};
use control_plane::object_storage::OBJECT_STORAGE_DEFAULT_PORT;
use control_plane::object_storage::ObjectStorage;
use control_plane::pageserver::PageServerNode;
use control_plane::safekeeper::SafekeeperNode;
use control_plane::storage_controller::{
@@ -92,7 +93,7 @@ enum NeonLocalCmd {
#[command(subcommand)]
Safekeeper(SafekeeperCmd),
#[command(subcommand)]
EndpointStorage(EndpointStorageCmd),
ObjectStorage(ObjectStorageCmd),
#[command(subcommand)]
Endpoint(EndpointCmd),
#[command(subcommand)]
@@ -459,14 +460,14 @@ enum SafekeeperCmd {
#[derive(clap::Subcommand)]
#[clap(about = "Manage object storage")]
enum EndpointStorageCmd {
Start(EndpointStorageStartCmd),
Stop(EndpointStorageStopCmd),
enum ObjectStorageCmd {
Start(ObjectStorageStartCmd),
Stop(ObjectStorageStopCmd),
}
#[derive(clap::Args)]
#[clap(about = "Start object storage")]
struct EndpointStorageStartCmd {
struct ObjectStorageStartCmd {
#[clap(short = 't', long, help = "timeout until we fail the command")]
#[arg(default_value = "10s")]
start_timeout: humantime::Duration,
@@ -474,7 +475,7 @@ struct EndpointStorageStartCmd {
#[derive(clap::Args)]
#[clap(about = "Stop object storage")]
struct EndpointStorageStopCmd {
struct ObjectStorageStopCmd {
#[arg(value_enum, default_value = "fast")]
#[clap(
short = 'm',
@@ -796,9 +797,7 @@ fn main() -> Result<()> {
}
NeonLocalCmd::StorageBroker(subcmd) => rt.block_on(handle_storage_broker(&subcmd, env)),
NeonLocalCmd::Safekeeper(subcmd) => rt.block_on(handle_safekeeper(&subcmd, env)),
NeonLocalCmd::EndpointStorage(subcmd) => {
rt.block_on(handle_endpoint_storage(&subcmd, env))
}
NeonLocalCmd::ObjectStorage(subcmd) => rt.block_on(handle_object_storage(&subcmd, env)),
NeonLocalCmd::Endpoint(subcmd) => rt.block_on(handle_endpoint(&subcmd, env)),
NeonLocalCmd::Mappings(subcmd) => handle_mappings(&subcmd, env),
};
@@ -1015,8 +1014,8 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result<LocalEnv> {
}
})
.collect(),
endpoint_storage: EndpointStorageConf {
port: ENDPOINT_STORAGE_DEFAULT_PORT,
object_storage: ObjectStorageConf {
port: OBJECT_STORAGE_DEFAULT_PORT,
},
pg_distrib_dir: None,
neon_distrib_dir: None,
@@ -1736,15 +1735,12 @@ async fn handle_safekeeper(subcmd: &SafekeeperCmd, env: &local_env::LocalEnv) ->
Ok(())
}
async fn handle_endpoint_storage(
subcmd: &EndpointStorageCmd,
env: &local_env::LocalEnv,
) -> Result<()> {
use EndpointStorageCmd::*;
let storage = EndpointStorage::from_env(env);
async fn handle_object_storage(subcmd: &ObjectStorageCmd, env: &local_env::LocalEnv) -> Result<()> {
use ObjectStorageCmd::*;
let storage = ObjectStorage::from_env(env);
// In tests like test_forward_compatibility or test_graceful_cluster_restart
// old neon binaries (without endpoint_storage) are present
// old neon binaries (without object_storage) are present
if !storage.bin.exists() {
eprintln!(
"{} binary not found. Ignore if this is a compatibility test",
@@ -1754,13 +1750,13 @@ async fn handle_endpoint_storage(
}
match subcmd {
Start(EndpointStorageStartCmd { start_timeout }) => {
Start(ObjectStorageStartCmd { start_timeout }) => {
if let Err(e) = storage.start(start_timeout).await {
eprintln!("endpoint_storage start failed: {e}");
eprintln!("object_storage start failed: {e}");
exit(1);
}
}
Stop(EndpointStorageStopCmd { stop_mode }) => {
Stop(ObjectStorageStopCmd { stop_mode }) => {
let immediate = match stop_mode {
StopMode::Fast => false,
StopMode::Immediate => true,
@@ -1870,10 +1866,10 @@ async fn handle_start_all_impl(
}
js.spawn(async move {
EndpointStorage::from_env(env)
ObjectStorage::from_env(env)
.start(&retry_timeout)
.await
.map_err(|e| e.context("start endpoint_storage"))
.map_err(|e| e.context("start object_storage"))
});
})();
@@ -1972,9 +1968,9 @@ async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
}
}
let storage = EndpointStorage::from_env(env);
let storage = ObjectStorage::from_env(env);
if let Err(e) = storage.stop(immediate) {
eprintln!("endpoint_storage stop failed: {:#}", e);
eprintln!("object_storage stop failed: {:#}", e);
}
for ps_conf in &env.pageservers {

View File

@@ -9,8 +9,8 @@
mod background_process;
pub mod broker;
pub mod endpoint;
pub mod endpoint_storage;
pub mod local_env;
pub mod object_storage;
pub mod pageserver;
pub mod postgresql_conf;
pub mod safekeeper;

View File

@@ -19,7 +19,7 @@ use serde::{Deserialize, Serialize};
use utils::auth::encode_from_key_file;
use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
use crate::endpoint_storage::{ENDPOINT_STORAGE_REMOTE_STORAGE_DIR, EndpointStorage};
use crate::object_storage::{OBJECT_STORAGE_REMOTE_STORAGE_DIR, ObjectStorage};
use crate::pageserver::{PAGESERVER_REMOTE_STORAGE_DIR, PageServerNode};
use crate::safekeeper::SafekeeperNode;
@@ -72,7 +72,7 @@ pub struct LocalEnv {
pub safekeepers: Vec<SafekeeperConf>,
pub endpoint_storage: EndpointStorageConf,
pub object_storage: ObjectStorageConf,
// Control plane upcall API for pageserver: if None, we will not run storage_controller If set, this will
// be propagated into each pageserver's configuration.
@@ -110,7 +110,7 @@ pub struct OnDiskConfig {
)]
pub pageservers: Vec<PageServerConf>,
pub safekeepers: Vec<SafekeeperConf>,
pub endpoint_storage: EndpointStorageConf,
pub object_storage: ObjectStorageConf,
pub control_plane_api: Option<Url>,
pub control_plane_hooks_api: Option<Url>,
pub control_plane_compute_hook_api: Option<Url>,
@@ -144,7 +144,7 @@ pub struct NeonLocalInitConf {
pub storage_controller: Option<NeonStorageControllerConf>,
pub pageservers: Vec<NeonLocalInitPageserverConf>,
pub safekeepers: Vec<SafekeeperConf>,
pub endpoint_storage: EndpointStorageConf,
pub object_storage: ObjectStorageConf,
pub control_plane_api: Option<Url>,
pub control_plane_hooks_api: Option<Url>,
pub generate_local_ssl_certs: bool,
@@ -152,7 +152,7 @@ pub struct NeonLocalInitConf {
#[derive(Serialize, Default, Deserialize, PartialEq, Eq, Clone, Debug)]
#[serde(default)]
pub struct EndpointStorageConf {
pub struct ObjectStorageConf {
pub port: u16,
}
@@ -413,8 +413,8 @@ impl LocalEnv {
self.pg_dir(pg_version, "lib")
}
pub fn endpoint_storage_bin(&self) -> PathBuf {
self.neon_distrib_dir.join("endpoint_storage")
pub fn object_storage_bin(&self) -> PathBuf {
self.neon_distrib_dir.join("object_storage")
}
pub fn pageserver_bin(&self) -> PathBuf {
@@ -450,8 +450,8 @@ impl LocalEnv {
self.base_data_dir.join("safekeepers").join(data_dir_name)
}
pub fn endpoint_storage_data_dir(&self) -> PathBuf {
self.base_data_dir.join("endpoint_storage")
pub fn object_storage_data_dir(&self) -> PathBuf {
self.base_data_dir.join("object_storage")
}
pub fn get_pageserver_conf(&self, id: NodeId) -> anyhow::Result<&PageServerConf> {
@@ -615,7 +615,7 @@ impl LocalEnv {
control_plane_compute_hook_api: _,
branch_name_mappings,
generate_local_ssl_certs,
endpoint_storage,
object_storage,
} = on_disk_config;
LocalEnv {
base_data_dir: repopath.to_owned(),
@@ -632,7 +632,7 @@ impl LocalEnv {
control_plane_hooks_api,
branch_name_mappings,
generate_local_ssl_certs,
endpoint_storage,
object_storage,
}
};
@@ -742,7 +742,7 @@ impl LocalEnv {
control_plane_compute_hook_api: None,
branch_name_mappings: self.branch_name_mappings.clone(),
generate_local_ssl_certs: self.generate_local_ssl_certs,
endpoint_storage: self.endpoint_storage.clone(),
object_storage: self.object_storage.clone(),
},
)
}
@@ -849,7 +849,7 @@ impl LocalEnv {
control_plane_api,
generate_local_ssl_certs,
control_plane_hooks_api,
endpoint_storage,
object_storage,
} = conf;
// Find postgres binaries.
@@ -901,7 +901,7 @@ impl LocalEnv {
control_plane_hooks_api,
branch_name_mappings: Default::default(),
generate_local_ssl_certs,
endpoint_storage,
object_storage,
};
if generate_local_ssl_certs {
@@ -929,13 +929,13 @@ impl LocalEnv {
.context("pageserver init failed")?;
}
EndpointStorage::from_env(&env)
ObjectStorage::from_env(&env)
.init()
.context("object storage init failed")?;
// setup remote remote location for default LocalFs remote storage
std::fs::create_dir_all(env.base_data_dir.join(PAGESERVER_REMOTE_STORAGE_DIR))?;
std::fs::create_dir_all(env.base_data_dir.join(ENDPOINT_STORAGE_REMOTE_STORAGE_DIR))?;
std::fs::create_dir_all(env.base_data_dir.join(OBJECT_STORAGE_REMOTE_STORAGE_DIR))?;
env.persist_config()
}

View File

@@ -1,33 +1,34 @@
use crate::background_process::{self, start_process, stop_process};
use crate::local_env::LocalEnv;
use anyhow::anyhow;
use anyhow::{Context, Result};
use camino::Utf8PathBuf;
use std::io::Write;
use std::time::Duration;
/// Directory within .neon which will be used by default for LocalFs remote storage.
pub const ENDPOINT_STORAGE_REMOTE_STORAGE_DIR: &str = "local_fs_remote_storage/endpoint_storage";
pub const ENDPOINT_STORAGE_DEFAULT_PORT: u16 = 9993;
pub const OBJECT_STORAGE_REMOTE_STORAGE_DIR: &str = "local_fs_remote_storage/object_storage";
pub const OBJECT_STORAGE_DEFAULT_PORT: u16 = 9993;
pub struct EndpointStorage {
pub struct ObjectStorage {
pub bin: Utf8PathBuf,
pub data_dir: Utf8PathBuf,
pub pemfile: Utf8PathBuf,
pub port: u16,
}
impl EndpointStorage {
pub fn from_env(env: &LocalEnv) -> EndpointStorage {
EndpointStorage {
bin: Utf8PathBuf::from_path_buf(env.endpoint_storage_bin()).unwrap(),
data_dir: Utf8PathBuf::from_path_buf(env.endpoint_storage_data_dir()).unwrap(),
impl ObjectStorage {
pub fn from_env(env: &LocalEnv) -> ObjectStorage {
ObjectStorage {
bin: Utf8PathBuf::from_path_buf(env.object_storage_bin()).unwrap(),
data_dir: Utf8PathBuf::from_path_buf(env.object_storage_data_dir()).unwrap(),
pemfile: Utf8PathBuf::from_path_buf(env.public_key_path.clone()).unwrap(),
port: env.endpoint_storage.port,
port: env.object_storage.port,
}
}
fn config_path(&self) -> Utf8PathBuf {
self.data_dir.join("endpoint_storage.json")
self.data_dir.join("object_storage.json")
}
fn listen_addr(&self) -> Utf8PathBuf {
@@ -48,7 +49,7 @@ impl EndpointStorage {
let cfg = Cfg {
listen: self.listen_addr(),
pemfile: parent.join(self.pemfile.clone()),
local_path: parent.join(ENDPOINT_STORAGE_REMOTE_STORAGE_DIR),
local_path: parent.join(OBJECT_STORAGE_REMOTE_STORAGE_DIR),
r#type: "LocalFs".to_string(),
};
std::fs::create_dir_all(self.config_path().parent().unwrap())?;
@@ -58,19 +59,24 @@ impl EndpointStorage {
}
pub async fn start(&self, retry_timeout: &Duration) -> Result<()> {
println!("Starting endpoint_storage at {}", self.listen_addr());
println!("Starting s3 proxy at {}", self.listen_addr());
std::io::stdout().flush().context("flush stdout")?;
let process_status_check = || async {
let res = reqwest::Client::new().get(format!("http://{}/metrics", self.listen_addr()));
match res.send().await {
Ok(res) => Ok(res.status().is_success()),
Err(_) => Ok(false),
tokio::time::sleep(Duration::from_millis(500)).await;
let res = reqwest::Client::new()
.get(format!("http://{}/metrics", self.listen_addr()))
.send()
.await;
match res {
Ok(response) if response.status().is_success() => Ok(true),
Ok(_) => Err(anyhow!("Failed to query /metrics")),
Err(e) => Err(anyhow!("Failed to check node status: {e}")),
}
};
let res = start_process(
"endpoint_storage",
"object_storage",
&self.data_dir.clone().into_std_path_buf(),
&self.bin.clone().into_std_path_buf(),
vec![self.config_path().to_string()],
@@ -88,14 +94,14 @@ impl EndpointStorage {
}
pub fn stop(&self, immediate: bool) -> anyhow::Result<()> {
stop_process(immediate, "endpoint_storage", &self.pid_file())
stop_process(immediate, "object_storage", &self.pid_file())
}
fn log_file(&self) -> Utf8PathBuf {
self.data_dir.join("endpoint_storage.log")
self.data_dir.join("object_storage.log")
}
fn pid_file(&self) -> Utf8PathBuf {
self.data_dir.join("endpoint_storage.pid")
self.data_dir.join("object_storage.pid")
}
}

View File

@@ -242,22 +242,13 @@ impl RemoteExtSpec {
match self.extension_data.get(real_ext_name) {
Some(_ext_data) => {
// We have decided to use the Go naming convention due to Kubernetes.
let arch = match std::env::consts::ARCH {
"x86_64" => "amd64",
"aarch64" => "arm64",
arch => arch,
};
// Construct the path to the extension archive
// BUILD_TAG/PG_MAJOR_VERSION/extensions/EXTENSION_NAME.tar.zst
//
// Keep it in sync with path generation in
// https://github.com/neondatabase/build-custom-extensions/tree/main
let archive_path_str = format!(
"{build_tag}/{arch}/{pg_major_version}/extensions/{real_ext_name}.tar.zst"
);
let archive_path_str =
format!("{build_tag}/{pg_major_version}/extensions/{real_ext_name}.tar.zst");
Ok((
real_ext_name.to_string(),
RemotePath::from_string(&archive_path_str)?,

View File

@@ -1,5 +1,5 @@
[package]
name = "endpoint_storage"
name = "object_storage"
version = "0.0.1"
edition.workspace = true
license.workspace = true

View File

@@ -2,7 +2,7 @@ use anyhow::anyhow;
use axum::body::{Body, Bytes};
use axum::response::{IntoResponse, Response};
use axum::{Router, http::StatusCode};
use endpoint_storage::{PrefixS3Path, S3Path, Storage, bad_request, internal_error, not_found, ok};
use object_storage::{PrefixS3Path, S3Path, Storage, bad_request, internal_error, not_found, ok};
use remote_storage::TimeoutOrCancel;
use remote_storage::{DownloadError, DownloadOpts, GenericRemoteStorage, RemotePath};
use std::{sync::Arc, time::SystemTime, time::UNIX_EPOCH};
@@ -46,12 +46,12 @@ async fn metrics() -> Result {
async fn get(S3Path { path }: S3Path, state: State) -> Result {
info!(%path, "downloading");
let download_err = |err| {
if let DownloadError::NotFound = err {
info!(%path, %err, "downloading"); // 404 is not an issue of _this_ service
let download_err = |e| {
if let DownloadError::NotFound = e {
info!(%path, %e, "downloading"); // 404 is not an issue of _this_ service
return not_found(&path);
}
internal_error(err, &path, "downloading")
internal_error(e, &path, "downloading")
};
let cancel = state.cancel.clone();
let opts = &DownloadOpts::default();
@@ -249,7 +249,7 @@ mod tests {
};
let proxy = Storage {
auth: endpoint_storage::JwtAuth::new(TEST_PUB_KEY_ED25519).unwrap(),
auth: object_storage::JwtAuth::new(TEST_PUB_KEY_ED25519).unwrap(),
storage,
cancel: cancel.clone(),
max_upload_file_limit: usize::MAX,
@@ -343,14 +343,14 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
TimelineId::from_array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 7]);
const ENDPOINT_ID: &str = "ep-winter-frost-a662z3vg";
fn token() -> String {
let claims = endpoint_storage::Claims {
let claims = object_storage::Claims {
tenant_id: TENANT_ID,
timeline_id: TIMELINE_ID,
endpoint_id: ENDPOINT_ID.into(),
exp: u64::MAX,
};
let key = jsonwebtoken::EncodingKey::from_ed_pem(TEST_PRIV_KEY_ED25519).unwrap();
let header = jsonwebtoken::Header::new(endpoint_storage::VALIDATION_ALGO);
let header = jsonwebtoken::Header::new(object_storage::VALIDATION_ALGO);
jsonwebtoken::encode(&header, &claims, &key).unwrap()
}
@@ -364,10 +364,7 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
vec![TIMELINE_ID.to_string(), TimelineId::generate().to_string()],
vec![ENDPOINT_ID, "ep-ololo"]
)
// first one is fully valid path, second path is valid for GET as
// read paths may have different endpoint if tenant and timeline matches
// (needed for prewarming RO->RW replica)
.skip(2);
.skip(1);
for ((uri, method), (tenant, timeline, endpoint)) in iproduct!(routes(), args) {
info!(%uri, %method, %tenant, %timeline, %endpoint);
@@ -478,16 +475,6 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
requests_chain(chain.into_iter(), |_| token()).await;
}
#[testlog(tokio::test)]
async fn read_other_endpoint_data() {
let uri = format!("/{TENANT_ID}/{TIMELINE_ID}/other_endpoint/key");
let chain = vec![
(uri.clone(), "GET", "", StatusCode::NOT_FOUND, false),
(uri.clone(), "PUT", "", StatusCode::UNAUTHORIZED, false),
];
requests_chain(chain.into_iter(), |_| token()).await;
}
fn delete_prefix_token(uri: &str) -> String {
use serde::Serialize;
let parts = uri.split("/").collect::<Vec<&str>>();
@@ -495,7 +482,7 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
struct PrefixClaims {
tenant_id: TenantId,
timeline_id: Option<TimelineId>,
endpoint_id: Option<endpoint_storage::EndpointId>,
endpoint_id: Option<object_storage::EndpointId>,
exp: u64,
}
let claims = PrefixClaims {
@@ -505,7 +492,7 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
exp: u64::MAX,
};
let key = jsonwebtoken::EncodingKey::from_ed_pem(TEST_PRIV_KEY_ED25519).unwrap();
let header = jsonwebtoken::Header::new(endpoint_storage::VALIDATION_ALGO);
let header = jsonwebtoken::Header::new(object_storage::VALIDATION_ALGO);
jsonwebtoken::encode(&header, &claims, &key).unwrap()
}

View File

@@ -169,19 +169,10 @@ impl FromRequestParts<Arc<Storage>> for S3Path {
.auth
.decode(bearer.token())
.map_err(|e| bad_request(e, "decoding token"))?;
// Read paths may have different endpoint ids. For readonly -> readwrite replica
// prewarming, endpoint must read other endpoint's data.
let endpoint_id = if parts.method == axum::http::Method::GET {
claims.endpoint_id.clone()
} else {
path.endpoint_id.clone()
};
let route = Claims {
tenant_id: path.tenant_id,
timeline_id: path.timeline_id,
endpoint_id,
endpoint_id: path.endpoint_id.clone(),
exp: claims.exp,
};
if route != claims {

View File

@@ -1,4 +1,4 @@
//! `endpoint_storage` is a service which provides API for uploading and downloading
//! `object_storage` is a service which provides API for uploading and downloading
//! files. It is used by compute and control plane for accessing LFC prewarm data.
//! This service is deployed either as a separate component or as part of compute image
//! for large computes.
@@ -33,7 +33,7 @@ async fn main() -> anyhow::Result<()> {
let config: String = std::env::args().skip(1).take(1).collect();
if config.is_empty() {
anyhow::bail!("Usage: endpoint_storage config.json")
anyhow::bail!("Usage: object_storage config.json")
}
info!("Reading config from {config}");
let config = std::fs::read_to_string(config.clone())?;
@@ -41,7 +41,7 @@ async fn main() -> anyhow::Result<()> {
info!("Reading pemfile from {}", config.pemfile.clone());
let pemfile = std::fs::read(config.pemfile.clone())?;
info!("Loading public key from {}", config.pemfile.clone());
let auth = endpoint_storage::JwtAuth::new(&pemfile)?;
let auth = object_storage::JwtAuth::new(&pemfile)?;
let listener = tokio::net::TcpListener::bind(config.listen).await.unwrap();
info!("listening on {}", listener.local_addr().unwrap());
@@ -50,7 +50,7 @@ async fn main() -> anyhow::Result<()> {
let cancel = tokio_util::sync::CancellationToken::new();
app::check_storage_permissions(&storage, cancel.clone()).await?;
let proxy = std::sync::Arc::new(endpoint_storage::Storage {
let proxy = std::sync::Arc::new(object_storage::Storage {
auth,
storage,
cancel: cancel.clone(),

View File

@@ -263,9 +263,7 @@ where
while let Some((tenant_id, tenant)) = tenants.next().await {
let mut tenant_resident_size = 0;
let timelines = tenant.list_timelines();
let timelines_len = timelines.len();
for timeline in timelines {
for timeline in tenant.list_timelines() {
let timeline_id = timeline.timeline_id;
match TimelineSnapshot::collect(&timeline, ctx) {
@@ -291,11 +289,6 @@ where
tenant_resident_size += timeline.resident_physical_size();
}
if timelines_len == 0 {
// Force set it to 1 byte to avoid not being reported -- all timelines are offloaded.
tenant_resident_size = 1;
}
let snap = TenantSnapshot::collect(&tenant, tenant_resident_size);
snap.to_metrics(tenant_id, Utc::now(), cache, &mut current_metrics);
}

View File

@@ -1285,10 +1285,6 @@ impl Timeline {
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
if query.is_empty() {
return Ok(BTreeMap::default());
}
let read_path = if self.conf.enable_read_path_debugging || ctx.read_path_debug() {
Some(ReadPath::new(
query.total_keyspace(),

View File

@@ -803,13 +803,7 @@ neon_create(SMgrRelation reln, ForkNumber forkNum, bool isRedo)
case RELPERSISTENCE_TEMP:
case RELPERSISTENCE_UNLOGGED:
#ifdef DEBUG_COMPARE_LOCAL
mdcreate(reln, forkNum, forkNum == INIT_FORKNUM || isRedo);
if (forkNum == MAIN_FORKNUM)
mdcreate(reln, INIT_FORKNUM, true);
#else
mdcreate(reln, forkNum, isRedo);
#endif
return;
default:
@@ -1979,10 +1973,6 @@ neon_start_unlogged_build(SMgrRelation reln)
case RELPERSISTENCE_UNLOGGED:
unlogged_build_rel = reln;
unlogged_build_phase = UNLOGGED_BUILD_NOT_PERMANENT;
#ifdef DEBUG_COMPARE_LOCAL
if (!IsParallelWorker())
mdcreate(reln, INIT_FORKNUM, true);
#endif
return;
default:
@@ -2005,14 +1995,12 @@ neon_start_unlogged_build(SMgrRelation reln)
* FIXME: should we pass isRedo true to create the tablespace dir if it
* doesn't exist? Is it needed?
*/
if (!IsParallelWorker())
{
#ifndef DEBUG_COMPARE_LOCAL
if (!IsParallelWorker())
mdcreate(reln, MAIN_FORKNUM, false);
#else
mdcreate(reln, INIT_FORKNUM, true);
mdcreate(reln, INIT_FORKNUM, false);
#endif
}
}
/*
@@ -2111,12 +2099,12 @@ neon_end_unlogged_build(SMgrRelation reln)
#ifndef DEBUG_COMPARE_LOCAL
/* use isRedo == true, so that we drop it immediately */
mdunlink(rinfob, forknum, true);
#else
mdunlink(rinfob, INIT_FORKNUM, true);
#endif
}
#ifdef DEBUG_COMPARE_LOCAL
mdunlink(rinfob, INIT_FORKNUM, true);
#endif
}
unlogged_build_rel = NULL;
unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
}

View File

@@ -297,7 +297,7 @@ async fn handle_client(
// Starting from here we only proxy the client's traffic.
info!("performing the proxy pass...");
match copy_bidirectional_client_compute(&mut tls_stream, &mut client).await {
match copy_bidirectional_client_compute(&mut tls_stream, &mut client, |_, _| {}).await {
Ok(_) => Ok(()),
Err(ErrorSource::Client(err)) => Err(err).context("client"),
Err(ErrorSource::Compute(err)) => Err(err).context("compute"),

View File

@@ -24,6 +24,7 @@ use crate::config::{
use crate::context::parquet::ParquetUploadArgs;
use crate::http::health_server::AppMetrics;
use crate::metrics::Metrics;
use crate::proxy::conntrack::ConnectionTracking;
use crate::rate_limiter::{
EndpointRateLimiter, LeakyBucketConfig, RateBucketInfo, WakeComputeRateLimiter,
};
@@ -418,6 +419,8 @@ pub async fn run() -> anyhow::Result<()> {
64,
));
let conntracking = Arc::new(ConnectionTracking::default());
// client facing tasks. these will exit on error or on cancellation
// cancellation returns Ok(())
let mut client_tasks = JoinSet::new();
@@ -431,6 +434,7 @@ pub async fn run() -> anyhow::Result<()> {
cancellation_token.clone(),
cancellation_handler.clone(),
endpoint_rate_limiter.clone(),
conntracking.clone(),
));
}
@@ -453,6 +457,7 @@ pub async fn run() -> anyhow::Result<()> {
proxy_listener,
cancellation_token.clone(),
cancellation_handler.clone(),
conntracking.clone(),
));
}
}

View File

@@ -13,6 +13,7 @@ use crate::error::ReportableError;
use crate::metrics::{Metrics, NumClientConnectionsGuard};
use crate::protocol2::{ConnectHeader, ConnectionInfo, read_proxy_protocol};
use crate::proxy::connect_compute::{TcpMechanism, connect_to_compute};
use crate::proxy::conntrack::ConnectionTracking;
use crate::proxy::handshake::{HandshakeData, handshake};
use crate::proxy::passthrough::ProxyPassthrough;
use crate::proxy::{
@@ -25,6 +26,7 @@ pub async fn task_main(
listener: tokio::net::TcpListener,
cancellation_token: CancellationToken,
cancellation_handler: Arc<CancellationHandler>,
conntracking: Arc<ConnectionTracking>,
) -> anyhow::Result<()> {
scopeguard::defer! {
info!("proxy has shut down");
@@ -50,6 +52,7 @@ pub async fn task_main(
let session_id = uuid::Uuid::new_v4();
let cancellation_handler = Arc::clone(&cancellation_handler);
let cancellations = cancellations.clone();
let conntracking = Arc::clone(&conntracking);
debug!(protocol = "tcp", %session_id, "accepted new TCP connection");
@@ -111,6 +114,7 @@ pub async fn task_main(
socket,
conn_gauge,
cancellations,
conntracking,
)
.instrument(ctx.span())
.boxed()
@@ -167,6 +171,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
stream: S,
conn_gauge: NumClientConnectionsGuard<'static>,
cancellations: tokio_util::task::task_tracker::TaskTracker,
conntracking: Arc<ConnectionTracking>,
) -> Result<Option<ProxyPassthrough<S>>, ClientRequestError> {
debug!(
protocol = %ctx.protocol(),
@@ -264,6 +269,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
compute: node,
session_id: ctx.session_id(),
cancel: session,
conntracking,
_req: request_gauge,
_conn: conn_gauge,
}))

View File

@@ -91,7 +91,6 @@ mod jemalloc;
mod logging;
mod metrics;
mod parse;
mod pglb;
mod protocol2;
mod proxy;
mod rate_limiter;

View File

@@ -200,8 +200,10 @@ pub enum HttpDirection {
#[derive(FixedCardinalityLabel, Copy, Clone)]
#[label(singleton = "direction")]
pub enum Direction {
Tx,
Rx,
#[label(rename = "tx")]
ComputeToClient,
#[label(rename = "rx")]
ClientToCompute,
}
#[derive(FixedCardinalityLabel, Clone, Copy, Debug)]

View File

@@ -1,193 +0,0 @@
#![allow(dead_code, reason = "TODO: work in progress")]
use std::pin::{Pin, pin};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll};
use std::{fmt, io};
use tokio::io::{AsyncRead, AsyncWrite, DuplexStream, ReadBuf};
use tokio::sync::mpsc;
const STREAM_CHANNEL_SIZE: usize = 16;
const MAX_STREAM_BUFFER_SIZE: usize = 4096;
#[derive(Debug)]
pub struct Connection {
stream_sender: mpsc::Sender<Stream>,
stream_receiver: mpsc::Receiver<Stream>,
stream_id_counter: Arc<AtomicUsize>,
}
impl Connection {
pub fn new() -> (Connection, Connection) {
let (sender_a, receiver_a) = mpsc::channel(STREAM_CHANNEL_SIZE);
let (sender_b, receiver_b) = mpsc::channel(STREAM_CHANNEL_SIZE);
let stream_id_counter = Arc::new(AtomicUsize::new(1));
let conn_a = Connection {
stream_sender: sender_a,
stream_receiver: receiver_b,
stream_id_counter: Arc::clone(&stream_id_counter),
};
let conn_b = Connection {
stream_sender: sender_b,
stream_receiver: receiver_a,
stream_id_counter,
};
(conn_a, conn_b)
}
#[inline]
fn next_stream_id(&self) -> StreamId {
StreamId(self.stream_id_counter.fetch_add(1, Ordering::Relaxed))
}
#[tracing::instrument(skip_all, fields(stream_id = tracing::field::Empty, err))]
pub async fn open_stream(&self) -> io::Result<Stream> {
let (local, remote) = tokio::io::duplex(MAX_STREAM_BUFFER_SIZE);
let stream_id = self.next_stream_id();
tracing::Span::current().record("stream_id", stream_id.0);
let local = Stream {
inner: local,
id: stream_id,
};
let remote = Stream {
inner: remote,
id: stream_id,
};
self.stream_sender
.send(remote)
.await
.map_err(io::Error::other)?;
Ok(local)
}
#[tracing::instrument(skip_all, fields(stream_id = tracing::field::Empty, err))]
pub async fn accept_stream(&mut self) -> io::Result<Option<Stream>> {
Ok(self.stream_receiver.recv().await.inspect(|stream| {
tracing::Span::current().record("stream_id", stream.id.0);
}))
}
}
#[derive(Copy, Clone, Debug)]
pub struct StreamId(usize);
impl fmt::Display for StreamId {
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
// TODO: Proper closing. Currently Streams can outlive their Connections.
// Carry WeakSender and check strong_count?
#[derive(Debug)]
pub struct Stream {
inner: DuplexStream,
id: StreamId,
}
impl Stream {
#[inline]
pub fn id(&self) -> StreamId {
self.id
}
}
impl AsyncRead for Stream {
#[tracing::instrument(level = "debug", skip_all, fields(stream_id = %self.id))]
#[inline]
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
pin!(&mut self.inner).poll_read(cx, buf)
}
}
impl AsyncWrite for Stream {
#[tracing::instrument(level = "debug", skip_all, fields(stream_id = %self.id))]
#[inline]
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
pin!(&mut self.inner).poll_write(cx, buf)
}
#[tracing::instrument(level = "debug", skip_all, fields(stream_id = %self.id))]
#[inline]
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
pin!(&mut self.inner).poll_flush(cx)
}
#[tracing::instrument(level = "debug", skip_all, fields(stream_id = %self.id))]
#[inline]
fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), io::Error>> {
pin!(&mut self.inner).poll_shutdown(cx)
}
#[tracing::instrument(level = "debug", skip_all, fields(stream_id = %self.id))]
#[inline]
fn poll_write_vectored(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[io::IoSlice<'_>],
) -> Poll<Result<usize, io::Error>> {
pin!(&mut self.inner).poll_write_vectored(cx, bufs)
}
#[inline]
fn is_write_vectored(&self) -> bool {
self.inner.is_write_vectored()
}
}
#[cfg(test)]
mod tests {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use super::*;
#[tokio::test]
async fn test_simple_roundtrip() {
let (client, mut server) = Connection::new();
let server_task = tokio::spawn(async move {
while let Some(mut stream) = server.accept_stream().await.unwrap() {
tokio::spawn(async move {
let mut buf = [0; 64];
loop {
match stream.read(&mut buf).await.unwrap() {
0 => break,
n => stream.write(&buf[..n]).await.unwrap(),
};
}
});
}
});
let mut stream = client.open_stream().await.unwrap();
stream.write_all(b"hello!").await.unwrap();
let mut buf = [0; 64];
let n = stream.read(&mut buf).await.unwrap();
assert_eq!(n, 6);
assert_eq!(&buf[..n], b"hello!");
drop(stream);
drop(client);
server_task.await.unwrap();
}
}

View File

@@ -1 +0,0 @@
pub mod inprocess;

View File

@@ -0,0 +1,564 @@
use std::fmt;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::SystemTime;
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub struct ConnId(usize);
#[derive(Default)]
pub struct ConnectionTracking {
conns: clashmap::ClashMap<ConnId, (ConnectionState, SystemTime)>,
}
impl ConnectionTracking {
pub fn new_tracker(self: &Arc<Self>) -> ConnectionTracker<Conn> {
let conn_id = self.new_conn_id();
ConnectionTracker::new(Conn {
conn_id,
tracking: Arc::clone(self),
})
}
fn new_conn_id(&self) -> ConnId {
static NEXT_ID: AtomicUsize = AtomicUsize::new(0);
let id = ConnId(NEXT_ID.fetch_add(1, Ordering::Relaxed));
self.conns
.insert(id, (ConnectionState::Idle, SystemTime::now()));
id
}
fn update(&self, conn_id: ConnId, new_state: ConnectionState) {
let new_timestamp = SystemTime::now();
let old_state = self.conns.insert(conn_id, (new_state, new_timestamp));
if let Some((old_state, _old_timestamp)) = old_state {
tracing::debug!(?conn_id, %old_state, %new_state, "conntrack: update");
} else {
tracing::debug!(?conn_id, %new_state, "conntrack: update");
}
}
fn remove(&self, conn_id: ConnId) {
if let Some((_, (old_state, _old_timestamp))) = self.conns.remove(&conn_id) {
tracing::debug!(?conn_id, %old_state, "conntrack: remove");
}
}
}
pub struct Conn {
conn_id: ConnId,
tracking: Arc<ConnectionTracking>,
}
impl StateChangeObserver for Conn {
fn change(&mut self, _old_state: ConnectionState, new_state: ConnectionState) {
match new_state {
ConnectionState::Init
| ConnectionState::Idle
| ConnectionState::Transaction
| ConnectionState::Busy
| ConnectionState::Unknown => self.tracking.update(self.conn_id, new_state),
ConnectionState::Closed => self.tracking.remove(self.conn_id),
}
}
}
/// Called by `ConnectionTracker` whenever the `ConnectionState` changed.
pub trait StateChangeObserver {
/// Called iff the connection's state changed.
fn change(&mut self, old_state: ConnectionState, new_state: ConnectionState);
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
#[repr(u8)]
pub enum ConnectionState {
#[default]
Init = 0,
Idle = 1,
Transaction = 2,
Busy = 3,
Closed = 4,
Unknown = 5,
}
impl fmt::Display for ConnectionState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
ConnectionState::Init => f.write_str("init"),
ConnectionState::Idle => f.write_str("idle"),
ConnectionState::Transaction => f.write_str("transaction"),
ConnectionState::Busy => f.write_str("busy"),
ConnectionState::Closed => f.write_str("closed"),
ConnectionState::Unknown => f.write_str("unknown"),
}
}
}
/// Tracks the `ConnectionState` of a connection by inspecting the frontend and
/// backend stream and reacting to specific messages. Used in combination with
/// two `TrackedStream`s.
pub struct ConnectionTracker<SCO: StateChangeObserver> {
state: ConnectionState,
observer: SCO,
}
impl<SCO: StateChangeObserver> Drop for ConnectionTracker<SCO> {
fn drop(&mut self) {
self.observer.change(self.state, ConnectionState::Closed);
}
}
impl<SCO: StateChangeObserver> ConnectionTracker<SCO> {
pub fn new(observer: SCO) -> Self {
ConnectionTracker {
state: ConnectionState::default(),
observer,
}
}
pub fn frontend_message_tag(&mut self, tag: Tag) {
self.update_state(|old_state| Self::state_from_frontend_tag(old_state, tag));
}
pub fn backend_message_tag(&mut self, tag: Tag) {
self.update_state(|old_state| Self::state_from_backend_tag(old_state, tag));
}
fn update_state(&mut self, new_state_fn: impl FnOnce(ConnectionState) -> ConnectionState) {
let old_state = self.state;
let new_state = new_state_fn(old_state);
if old_state != new_state {
self.observer.change(old_state, new_state);
self.state = new_state;
}
}
fn state_from_frontend_tag(_old_state: ConnectionState, fe_tag: Tag) -> ConnectionState {
// Most activity from the client puts connection into busy state.
// Only the server can put a connection back into idle state.
match fe_tag {
Tag::Start | Tag::ReadyForQuery(_) | Tag::Message(_) => ConnectionState::Busy,
Tag::End => ConnectionState::Closed,
Tag::Lost => ConnectionState::Unknown,
}
}
fn state_from_backend_tag(old_state: ConnectionState, be_tag: Tag) -> ConnectionState {
match be_tag {
// Check for RFQ and put connection into idle or idle in transaction state.
Tag::ReadyForQuery(b'I') => ConnectionState::Idle,
Tag::ReadyForQuery(b'T') => ConnectionState::Transaction,
Tag::ReadyForQuery(b'E') => ConnectionState::Transaction,
// We can't put a connection into idle state for unknown RFQ status.
Tag::ReadyForQuery(_) => ConnectionState::Unknown,
// Ignore out-fo message from the server.
Tag::NOTICE | Tag::NOTIFICATION_RESPONSE | Tag::PARAMETER_STATUS => old_state,
// All other activity from server puts connection into busy state.
Tag::Start | Tag::Message(_) => ConnectionState::Busy,
Tag::End => ConnectionState::Closed,
Tag::Lost => ConnectionState::Unknown,
}
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum Tag {
Message(u8),
ReadyForQuery(u8),
Start,
End,
Lost,
}
impl Tag {
const READY_FOR_QUERY: Tag = Tag::Message(b'Z');
const NOTICE: Tag = Tag::Message(b'N');
const NOTIFICATION_RESPONSE: Tag = Tag::Message(b'A');
const PARAMETER_STATUS: Tag = Tag::Message(b'S');
}
impl fmt::Display for Tag {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
Tag::Start => f.write_str("start"),
Tag::End => f.write_str("end"),
Tag::Lost => f.write_str("lost"),
Tag::Message(tag) => write!(f, "'{}'", tag as char),
Tag::ReadyForQuery(status) => write!(f, "ReadyForQuery:'{}'", status as char),
}
}
}
pub trait TagObserver {
fn observe(&mut self, tag: Tag);
}
impl<F: FnMut(Tag)> TagObserver for F {
fn observe(&mut self, tag: Tag) {
(self)(tag);
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub(super) enum StreamScannerState {
#[allow(dead_code)]
/// Initial state when no message has been read and we are looling for a
/// message without a tag.
Start,
/// Read a message tag.
Tag,
/// Read the length bytes and calculate the total length.
Length {
tag: Tag,
/// Number of bytes missing to know the full length of the message: 0..=4
length_bytes_missing: usize,
/// Total length of the message (without tag) that is calculated as we
/// read the bytes for the length.
calculated_length: usize,
},
/// Read (= skip) the payload.
Payload {
tag: Tag,
/// If this is the first time payload bytes are read. Important when
/// inspecting specific messages, like ReadyForQuery.
first: bool,
/// Number of payload bytes left to read before looking for a new tag.
bytes_to_skip: usize,
},
/// Stream was terminated.
End,
/// Stream ended up in a lost state. We only stop tracking the stream, not
/// interrupt it.
Lost,
}
impl StreamScannerState {
pub(super) fn scan_bytes<TO: TagObserver>(&mut self, mut buf: &[u8], observer: &mut TO) {
use StreamScannerState as S;
if matches!(*self, S::End | S::Lost) {
return;
}
if buf.is_empty() {
match *self {
S::Start | S::Tag => {
observer.observe(Tag::End);
*self = S::End;
return;
}
S::Length { .. } | S::Payload { .. } => {
observer.observe(Tag::Lost);
*self = S::Lost;
return;
}
S::End | S::Lost => unreachable!(),
}
}
while !buf.is_empty() {
match *self {
S::Start => {
*self = S::Length {
tag: Tag::Start,
length_bytes_missing: 4,
calculated_length: 0,
};
}
S::Tag => {
let tag = buf.first().copied().expect("buf not empty");
buf = &buf[1..];
*self = S::Length {
tag: Tag::Message(tag),
length_bytes_missing: 4,
calculated_length: 0,
};
}
S::Length {
tag,
mut length_bytes_missing,
mut calculated_length,
} => {
let consume = length_bytes_missing.min(buf.len());
let (length_bytes, remainder) = buf.split_at(consume);
for b in length_bytes {
calculated_length <<= 8;
calculated_length |= *b as usize;
}
buf = remainder;
length_bytes_missing -= consume;
if length_bytes_missing == 0 {
let Some(bytes_to_skip) = calculated_length.checked_sub(4) else {
observer.observe(Tag::Lost);
*self = S::Lost;
return;
};
if bytes_to_skip == 0 {
observer.observe(tag);
*self = S::Tag;
} else {
*self = S::Payload {
tag,
first: true,
bytes_to_skip,
};
}
} else {
*self = S::Length {
tag,
length_bytes_missing,
calculated_length,
};
}
}
S::Payload {
tag,
first,
mut bytes_to_skip,
} => {
let consume = bytes_to_skip.min(buf.len());
bytes_to_skip -= consume;
if bytes_to_skip == 0 {
if tag == Tag::READY_FOR_QUERY && first && consume == 1 {
let status = buf.first().copied().expect("buf not empty");
observer.observe(Tag::ReadyForQuery(status));
} else {
observer.observe(tag);
}
*self = S::Tag;
} else {
*self = S::Payload {
tag,
first: false,
bytes_to_skip,
};
}
buf = &buf[consume..];
}
S::End | S::Lost => unreachable!(),
}
}
}
}
#[cfg(test)]
mod tests {
use std::cell::RefCell;
use std::io;
use std::pin::Pin;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, ReadBuf};
use tokio::io::{AsyncReadExt, BufReader};
use super::*;
#[test]
fn test_stream_scanner() {
let tags = Rc::new(RefCell::new(Vec::new()));
let observer_tags = tags.clone();
let mut observer = move |tag| {
observer_tags.borrow_mut().push(tag);
};
let mut state = StreamScannerState::Start;
state.scan_bytes(&[0, 0], &mut observer);
assert_eq!(tags.borrow().as_slice(), &[]);
assert_eq!(
state,
StreamScannerState::Length {
tag: Tag::Start,
length_bytes_missing: 2,
calculated_length: 0,
}
);
state.scan_bytes(&[0x01, 0x01, 0x00], &mut observer);
assert_eq!(tags.borrow().as_slice(), &[]);
assert_eq!(
state,
StreamScannerState::Payload {
tag: Tag::Start,
first: false,
bytes_to_skip: 0x00000101 - 4 - 1,
}
);
state.scan_bytes(vec![0; 0x00000101 - 4 - 1 - 1].as_slice(), &mut observer);
assert_eq!(tags.borrow().as_slice(), &[]);
assert_eq!(
state,
StreamScannerState::Payload {
tag: Tag::Start,
first: false,
bytes_to_skip: 1,
}
);
state.scan_bytes(&[0x00, b'A', 0x00, 0x00, 0x00, 0x08], &mut observer);
assert_eq!(tags.borrow().as_slice(), &[Tag::Start]);
assert_eq!(
state,
StreamScannerState::Payload {
tag: Tag::Message(b'A'),
first: true,
bytes_to_skip: 4,
}
);
state.scan_bytes(&[0, 0, 0, 0], &mut observer);
assert_eq!(tags.borrow().as_slice(), &[Tag::Start, Tag::Message(b'A')]);
assert_eq!(state, StreamScannerState::Tag);
state.scan_bytes(&[b'Z', 0x00, 0x00, 0x00, 0x05, b'T'], &mut observer);
assert_eq!(
tags.borrow().as_slice(),
&[Tag::Start, Tag::Message(b'A'), Tag::ReadyForQuery(b'T')]
);
assert_eq!(state, StreamScannerState::Tag);
state.scan_bytes(&[], &mut observer);
assert_eq!(
tags.borrow().as_slice(),
&[
Tag::Start,
Tag::Message(b'A'),
Tag::ReadyForQuery(b'T'),
Tag::End
]
);
assert_eq!(state, StreamScannerState::End);
}
#[tokio::test]
async fn test_connection_tracker() {
let transitions: Arc<Mutex<Vec<(ConnectionState, ConnectionState)>>> = Arc::default();
struct Observer(Arc<Mutex<Vec<(ConnectionState, ConnectionState)>>>);
impl StateChangeObserver for Observer {
fn change(&mut self, old_state: ConnectionState, new_state: ConnectionState) {
self.0.lock().unwrap().push((old_state, new_state));
}
}
let mut tracker = ConnectionTracker::new(Observer(transitions.clone()));
let stream = BufReader::new(
&[
0, 0, 0, 4, // Init
b'Z', 0, 0, 0, 5, b'I', // Init -> Idle
b'x', 0, 0, 0, 4, // Idle -> Busy
b'Z', 0, 0, 0, 5, b'I', // Busy -> Idle
][..],
);
// AsyncRead
let mut stream = TrackedStream::new(stream, |tag| tracker.backend_message_tag(tag));
let mut readbuf = [0; 2];
let n = stream.read_exact(&mut readbuf).await.unwrap();
assert_eq!(n, 2);
assert_eq!(&readbuf, &[0, 0,]);
assert!(transitions.lock().unwrap().is_empty());
let mut readbuf = [0; 2];
let n = stream.read_exact(&mut readbuf).await.unwrap();
assert_eq!(n, 2);
assert_eq!(&readbuf, &[0, 4]);
assert_eq!(
transitions.lock().unwrap().as_slice(),
&[(ConnectionState::Init, ConnectionState::Busy)]
);
let mut readbuf = [0; 6];
let n = stream.read_exact(&mut readbuf).await.unwrap();
assert_eq!(n, 6);
assert_eq!(&readbuf, &[b'Z', 0, 0, 0, 5, b'I']);
assert_eq!(
transitions.lock().unwrap().as_slice(),
&[
(ConnectionState::Init, ConnectionState::Busy),
(ConnectionState::Busy, ConnectionState::Idle),
]
);
let mut readbuf = [0; 5];
let n = stream.read_exact(&mut readbuf).await.unwrap();
assert_eq!(n, 5);
assert_eq!(&readbuf, &[b'x', 0, 0, 0, 4]);
assert_eq!(
transitions.lock().unwrap().as_slice(),
&[
(ConnectionState::Init, ConnectionState::Busy),
(ConnectionState::Busy, ConnectionState::Idle),
(ConnectionState::Idle, ConnectionState::Busy),
]
);
let mut readbuf = [0; 6];
let n = stream.read_exact(&mut readbuf).await.unwrap();
assert_eq!(n, 6);
assert_eq!(&readbuf, &[b'Z', 0, 0, 0, 5, b'I']);
assert_eq!(
transitions.lock().unwrap().as_slice(),
&[
(ConnectionState::Init, ConnectionState::Busy),
(ConnectionState::Busy, ConnectionState::Idle),
(ConnectionState::Idle, ConnectionState::Busy),
(ConnectionState::Busy, ConnectionState::Idle),
]
);
}
pub struct TrackedStream<S, TO> {
stream: S,
observer: TO,
state: StreamScannerState,
}
impl<S: Unpin, TO> Unpin for TrackedStream<S, TO> {}
impl<S: AsyncRead + Unpin, TO: TagObserver> TrackedStream<S, TO> {
pub const fn new(stream: S, observer: TO) -> Self {
TrackedStream {
stream,
observer,
state: StreamScannerState::Start,
}
}
}
impl<S: AsyncRead + Unpin, TO: TagObserver> AsyncRead for TrackedStream<S, TO> {
#[inline]
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let Self {
stream,
observer,
state,
} = Pin::into_inner(self);
let old_len = buf.filled().len();
match Pin::new(stream).poll_read(cx, buf) {
Poll::Ready(Ok(())) => {
let new_len = buf.filled().len();
state.scan_bytes(&buf.filled()[old_len..new_len], observer);
Poll::Ready(Ok(()))
}
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Pending => Poll::Pending,
}
}
}
}

View File

@@ -6,6 +6,8 @@ use std::task::{Context, Poll, ready};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tracing::info;
use crate::metrics::Direction;
#[derive(Debug)]
enum TransferState {
Running(CopyBuffer),
@@ -43,6 +45,8 @@ pub enum ErrorSource {
fn transfer_one_direction<A, B>(
cx: &mut Context<'_>,
state: &mut TransferState,
direction: Direction,
conn_tracker: &mut impl for<'a> FnMut(Direction, &'a [u8]),
r: &mut A,
w: &mut B,
) -> Poll<Result<u64, ErrorDirection>>
@@ -55,7 +59,8 @@ where
loop {
match state {
TransferState::Running(buf) => {
let count = ready!(buf.poll_copy(cx, r.as_mut(), w.as_mut()))?;
let count =
ready!(buf.poll_copy(cx, direction, conn_tracker, r.as_mut(), w.as_mut()))?;
*state = TransferState::ShuttingDown(count);
}
TransferState::ShuttingDown(count) => {
@@ -71,6 +76,7 @@ where
pub async fn copy_bidirectional_client_compute<Client, Compute>(
client: &mut Client,
compute: &mut Compute,
mut conn_tracker: impl for<'a> FnMut(Direction, &'a [u8]),
) -> Result<(u64, u64), ErrorSource>
where
Client: AsyncRead + AsyncWrite + Unpin + ?Sized,
@@ -80,12 +86,24 @@ where
let mut compute_to_client = TransferState::Running(CopyBuffer::new());
poll_fn(|cx| {
let mut client_to_compute_result =
transfer_one_direction(cx, &mut client_to_compute, client, compute)
.map_err(ErrorSource::from_client)?;
let mut compute_to_client_result =
transfer_one_direction(cx, &mut compute_to_client, compute, client)
.map_err(ErrorSource::from_compute)?;
let mut client_to_compute_result = transfer_one_direction(
cx,
&mut client_to_compute,
Direction::ClientToCompute,
&mut conn_tracker,
client,
compute,
)
.map_err(ErrorSource::from_client)?;
let mut compute_to_client_result = transfer_one_direction(
cx,
&mut compute_to_client,
Direction::ComputeToClient,
&mut conn_tracker,
compute,
client,
)
.map_err(ErrorSource::from_compute)?;
// TODO: 1 info log, with a enum label for close direction.
@@ -95,9 +113,15 @@ where
info!("Compute is done, terminate client");
// Initiate shutdown
client_to_compute = TransferState::ShuttingDown(buf.amt);
client_to_compute_result =
transfer_one_direction(cx, &mut client_to_compute, client, compute)
.map_err(ErrorSource::from_client)?;
client_to_compute_result = transfer_one_direction(
cx,
&mut client_to_compute,
Direction::ClientToCompute,
&mut conn_tracker,
client,
compute,
)
.map_err(ErrorSource::from_client)?;
}
}
@@ -107,9 +131,15 @@ where
info!("Client is done, terminate compute");
// Initiate shutdown
compute_to_client = TransferState::ShuttingDown(buf.amt);
compute_to_client_result =
transfer_one_direction(cx, &mut compute_to_client, compute, client)
.map_err(ErrorSource::from_compute)?;
compute_to_client_result = transfer_one_direction(
cx,
&mut compute_to_client,
Direction::ComputeToClient,
&mut conn_tracker,
compute,
client,
)
.map_err(ErrorSource::from_compute)?;
}
}
@@ -148,6 +178,8 @@ impl CopyBuffer {
fn poll_fill_buf<R>(
&mut self,
cx: &mut Context<'_>,
direction: Direction,
conn_tracker: &mut impl for<'a> FnMut(Direction, &'a [u8]),
reader: Pin<&mut R>,
) -> Poll<io::Result<()>>
where
@@ -158,6 +190,8 @@ impl CopyBuffer {
buf.set_filled(me.cap);
let res = reader.poll_read(cx, &mut buf);
conn_tracker(direction, &buf.filled()[me.cap..]);
if let Poll::Ready(Ok(())) = res {
let filled_len = buf.filled().len();
me.read_done = me.cap == filled_len;
@@ -169,6 +203,8 @@ impl CopyBuffer {
fn poll_write_buf<R, W>(
&mut self,
cx: &mut Context<'_>,
direction: Direction,
conn_tracker: &mut impl for<'a> FnMut(Direction, &'a [u8]),
mut reader: Pin<&mut R>,
mut writer: Pin<&mut W>,
) -> Poll<Result<usize, ErrorDirection>>
@@ -182,7 +218,8 @@ impl CopyBuffer {
// Top up the buffer towards full if we can read a bit more
// data - this should improve the chances of a large write
if !me.read_done && me.cap < me.buf.len() {
ready!(me.poll_fill_buf(cx, reader.as_mut())).map_err(ErrorDirection::Read)?;
ready!(me.poll_fill_buf(cx, direction, conn_tracker, reader.as_mut()))
.map_err(ErrorDirection::Read)?;
}
Poll::Pending
}
@@ -193,6 +230,8 @@ impl CopyBuffer {
pub(super) fn poll_copy<R, W>(
&mut self,
cx: &mut Context<'_>,
direction: Direction,
conn_tracker: &mut impl for<'a> FnMut(Direction, &'a [u8]),
mut reader: Pin<&mut R>,
mut writer: Pin<&mut W>,
) -> Poll<Result<u64, ErrorDirection>>
@@ -204,7 +243,7 @@ impl CopyBuffer {
// If there is some space left in our buffer, then we try to read some
// data to continue, thus maximizing the chances of a large write.
if self.cap < self.buf.len() && !self.read_done {
match self.poll_fill_buf(cx, reader.as_mut()) {
match self.poll_fill_buf(cx, direction, conn_tracker, reader.as_mut()) {
Poll::Ready(Ok(())) => (),
Poll::Ready(Err(err)) => return Poll::Ready(Err(ErrorDirection::Read(err))),
Poll::Pending => {
@@ -227,7 +266,13 @@ impl CopyBuffer {
// If our buffer has some data, let's write it out!
while self.pos < self.cap {
let i = ready!(self.poll_write_buf(cx, reader.as_mut(), writer.as_mut()))?;
let i = ready!(self.poll_write_buf(
cx,
direction,
conn_tracker,
reader.as_mut(),
writer.as_mut()
))?;
if i == 0 {
return Poll::Ready(Err(ErrorDirection::Write(io::Error::new(
io::ErrorKind::WriteZero,
@@ -278,9 +323,10 @@ mod tests {
compute_client.write_all(b"Neon").await.unwrap();
compute_client.shutdown().await.unwrap();
let result = copy_bidirectional_client_compute(&mut client_proxy, &mut compute_proxy)
.await
.unwrap();
let result =
copy_bidirectional_client_compute(&mut client_proxy, &mut compute_proxy, |_, _| {})
.await
.unwrap();
// Assert correct transferred amounts
let (client_to_compute_count, compute_to_client_count) = result;
@@ -301,9 +347,10 @@ mod tests {
.await
.unwrap();
let result = copy_bidirectional_client_compute(&mut client_proxy, &mut compute_proxy)
.await
.unwrap();
let result =
copy_bidirectional_client_compute(&mut client_proxy, &mut compute_proxy, |_, _| {})
.await
.unwrap();
// Assert correct transferred amounts
let (client_to_compute_count, compute_to_client_count) = result;

View File

@@ -2,6 +2,7 @@
mod tests;
pub(crate) mod connect_compute;
pub mod conntrack;
mod copy_bidirectional;
pub(crate) mod handshake;
pub(crate) mod passthrough;
@@ -30,6 +31,7 @@ use crate::context::RequestContext;
use crate::error::ReportableError;
use crate::metrics::{Metrics, NumClientConnectionsGuard};
use crate::protocol2::{ConnectHeader, ConnectionInfo, ConnectionInfoExtra, read_proxy_protocol};
use crate::proxy::conntrack::ConnectionTracking;
use crate::proxy::handshake::{HandshakeData, handshake};
use crate::rate_limiter::EndpointRateLimiter;
use crate::stream::{PqStream, Stream};
@@ -60,6 +62,7 @@ pub async fn task_main(
cancellation_token: CancellationToken,
cancellation_handler: Arc<CancellationHandler>,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
conntracking: Arc<ConnectionTracking>,
) -> anyhow::Result<()> {
scopeguard::defer! {
info!("proxy has shut down");
@@ -85,6 +88,7 @@ pub async fn task_main(
let session_id = uuid::Uuid::new_v4();
let cancellation_handler = Arc::clone(&cancellation_handler);
let cancellations = cancellations.clone();
let conntracking = Arc::clone(&conntracking);
debug!(protocol = "tcp", %session_id, "accepted new TCP connection");
let endpoint_rate_limiter2 = endpoint_rate_limiter.clone();
@@ -149,6 +153,7 @@ pub async fn task_main(
endpoint_rate_limiter2,
conn_gauge,
cancellations,
conntracking,
)
.instrument(ctx.span())
.boxed()
@@ -268,6 +273,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
conn_gauge: NumClientConnectionsGuard<'static>,
cancellations: tokio_util::task::task_tracker::TaskTracker,
conntracking: Arc<ConnectionTracking>,
) -> Result<Option<ProxyPassthrough<S>>, ClientRequestError> {
debug!(
protocol = %ctx.protocol(),
@@ -409,6 +415,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
compute: node,
session_id: ctx.session_id(),
cancel: session,
conntracking,
_req: request_gauge,
_conn: conn_gauge,
}))

View File

@@ -1,7 +1,8 @@
use std::sync::Arc;
use smol_str::SmolStr;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::debug;
use utils::measured_stream::MeasuredStream;
use super::copy_bidirectional::ErrorSource;
use crate::cancellation;
@@ -9,16 +10,19 @@ use crate::compute::PostgresConnection;
use crate::config::ComputeConfig;
use crate::control_plane::messages::MetricsAuxInfo;
use crate::metrics::{Direction, Metrics, NumClientConnectionsGuard, NumConnectionRequestsGuard};
use crate::proxy::conntrack::{ConnectionTracking, StreamScannerState};
use crate::proxy::copy_bidirectional::copy_bidirectional_client_compute;
use crate::stream::Stream;
use crate::usage_metrics::{Ids, MetricCounterRecorder, USAGE_METRICS};
/// Forward bytes in both directions (client <-> compute).
#[tracing::instrument(skip_all)]
pub(crate) async fn proxy_pass(
client: impl AsyncRead + AsyncWrite + Unpin,
compute: impl AsyncRead + AsyncWrite + Unpin,
mut client: impl AsyncRead + AsyncWrite + Unpin,
mut compute: impl AsyncRead + AsyncWrite + Unpin,
aux: MetricsAuxInfo,
private_link_id: Option<SmolStr>,
conntracking: &Arc<ConnectionTracking>,
) -> Result<(), ErrorSource> {
// we will report ingress at a later date
let usage_tx = USAGE_METRICS.register(Ids {
@@ -27,35 +31,35 @@ pub(crate) async fn proxy_pass(
private_link_id,
});
let mut conn_tracker = conntracking.new_tracker();
let metrics = &Metrics::get().proxy.io_bytes;
let m_sent = metrics.with_labels(Direction::Tx);
let mut client = MeasuredStream::new(
client,
|_| {},
|cnt| {
// Number of bytes we sent to the client (outbound).
metrics.get_metric(m_sent).inc_by(cnt as u64);
usage_tx.record_egress(cnt as u64);
},
);
let m_sent = metrics.with_labels(Direction::ComputeToClient);
let m_recv = metrics.with_labels(Direction::ClientToCompute);
let m_recv = metrics.with_labels(Direction::Rx);
let mut compute = MeasuredStream::new(
compute,
|_| {},
|cnt| {
// Number of bytes the client sent to the compute node (inbound).
metrics.get_metric(m_recv).inc_by(cnt as u64);
usage_tx.record_ingress(cnt as u64);
},
);
let mut client_to_compute = StreamScannerState::Tag;
let mut compute_to_client = StreamScannerState::Tag;
// Starting from here we only proxy the client's traffic.
debug!("performing the proxy pass...");
let _ = crate::proxy::copy_bidirectional::copy_bidirectional_client_compute(
&mut client,
&mut compute,
)
let _ = copy_bidirectional_client_compute(&mut client, &mut compute, |direction, bytes| {
match direction {
Direction::ClientToCompute => {
client_to_compute
.scan_bytes(bytes, &mut |tag| conn_tracker.frontend_message_tag(tag));
metrics.get_metric(m_recv).inc_by(bytes.len() as u64);
usage_tx.record_ingress(bytes.len() as u64);
}
Direction::ComputeToClient => {
compute_to_client
.scan_bytes(bytes, &mut |tag| conn_tracker.backend_message_tag(tag));
metrics.get_metric(m_sent).inc_by(bytes.len() as u64);
usage_tx.record_egress(bytes.len() as u64);
}
}
})
.await?;
Ok(())
@@ -68,6 +72,7 @@ pub(crate) struct ProxyPassthrough<S> {
pub(crate) session_id: uuid::Uuid,
pub(crate) private_link_id: Option<SmolStr>,
pub(crate) cancel: cancellation::Session,
pub(crate) conntracking: Arc<ConnectionTracking>,
pub(crate) _req: NumConnectionRequestsGuard<'static>,
pub(crate) _conn: NumClientConnectionsGuard<'static>,
@@ -83,6 +88,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> ProxyPassthrough<S> {
self.compute.stream,
self.aux,
self.private_link_id,
&self.conntracking,
)
.await;
if let Err(err) = self

View File

@@ -50,6 +50,7 @@ use crate::context::RequestContext;
use crate::ext::TaskExt;
use crate::metrics::Metrics;
use crate::protocol2::{ChainRW, ConnectHeader, ConnectionInfo, read_proxy_protocol};
use crate::proxy::conntrack::ConnectionTracking;
use crate::proxy::run_until_cancelled;
use crate::rate_limiter::EndpointRateLimiter;
use crate::serverless::backend::PoolingBackend;
@@ -124,6 +125,9 @@ pub async fn task_main(
connections.close(); // allows `connections.wait to complete`
let cancellations = tokio_util::task::task_tracker::TaskTracker::new();
let conntracking = Arc::new(ConnectionTracking::default());
while let Some(res) = run_until_cancelled(ws_listener.accept(), &cancellation_token).await {
let (conn, peer_addr) = res.context("could not accept TCP stream")?;
if let Err(e) = conn.set_nodelay(true) {
@@ -153,6 +157,8 @@ pub async fn task_main(
let cancellation_handler = cancellation_handler.clone();
let endpoint_rate_limiter = endpoint_rate_limiter.clone();
let cancellations = cancellations.clone();
let conntracking = Arc::clone(&conntracking);
connections.spawn(
async move {
let conn_token2 = conn_token.clone();
@@ -185,6 +191,7 @@ pub async fn task_main(
cancellation_handler,
endpoint_rate_limiter,
conn_token,
conntracking,
conn,
conn_info,
session_id,
@@ -309,6 +316,7 @@ async fn connection_handler(
cancellation_handler: Arc<CancellationHandler>,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
cancellation_token: CancellationToken,
conntracking: Arc<ConnectionTracking>,
conn: AsyncRW,
conn_info: ConnectionInfo,
session_id: uuid::Uuid,
@@ -347,6 +355,7 @@ async fn connection_handler(
// `request_handler` is not cancel safe. It expects to be cancelled only at specific times.
// By spawning the future, we ensure it never gets cancelled until it decides to.
let cancellations = cancellations.clone();
let conntracking = Arc::clone(&conntracking);
let handler = connections.spawn(
request_handler(
req,
@@ -359,6 +368,7 @@ async fn connection_handler(
http_request_token,
endpoint_rate_limiter.clone(),
cancellations,
conntracking,
)
.in_current_span()
.map_ok_or_else(api_error_into_response, |r| r),
@@ -407,6 +417,7 @@ async fn request_handler(
http_cancellation_token: CancellationToken,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
cancellations: TaskTracker,
conntracking: Arc<ConnectionTracking>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, ApiError> {
let host = request
.headers()
@@ -452,6 +463,7 @@ async fn request_handler(
endpoint_rate_limiter,
host,
cancellations,
conntracking,
)
.await
{

View File

@@ -17,6 +17,7 @@ use crate::config::ProxyConfig;
use crate::context::RequestContext;
use crate::error::ReportableError;
use crate::metrics::Metrics;
use crate::proxy::conntrack::ConnectionTracking;
use crate::proxy::{ClientMode, ErrorSource, handle_client};
use crate::rate_limiter::EndpointRateLimiter;
@@ -133,6 +134,7 @@ pub(crate) async fn serve_websocket(
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
hostname: Option<String>,
cancellations: tokio_util::task::task_tracker::TaskTracker,
conntracking: Arc<ConnectionTracking>,
) -> anyhow::Result<()> {
let websocket = websocket.await?;
let websocket = WebSocketServer::after_handshake(TokioIo::new(websocket));
@@ -152,6 +154,7 @@ pub(crate) async fn serve_websocket(
endpoint_rate_limiter,
conn_gauge,
cancellations,
conntracking,
))
.await;

View File

@@ -14,7 +14,6 @@ use clap::{ArgAction, Parser};
use futures::future::BoxFuture;
use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt};
use http_utils::tls_certs::ReloadingCertificateResolver;
use metrics::set_build_info_metric;
use remote_storage::RemoteStorageConfig;
use safekeeper::defaults::{
@@ -24,8 +23,8 @@ use safekeeper::defaults::{
DEFAULT_SSL_CERT_RELOAD_PERIOD, DEFAULT_SSL_KEY_FILE,
};
use safekeeper::{
BACKGROUND_RUNTIME, BROKER_RUNTIME, GlobalTimelines, HTTP_RUNTIME, SafeKeeperConf,
WAL_SERVICE_RUNTIME, broker, control_file, http, wal_backup, wal_service,
BROKER_RUNTIME, GlobalTimelines, HTTP_RUNTIME, SafeKeeperConf, WAL_SERVICE_RUNTIME, broker,
control_file, http, wal_backup, wal_service,
};
use sd_notify::NotifyState;
use storage_broker::{DEFAULT_ENDPOINT, Uri};
@@ -216,21 +215,16 @@ struct Args {
ssl_cert_file: Utf8PathBuf,
/// Period to reload certificate and private key from files.
#[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_SSL_CERT_RELOAD_PERIOD)]
ssl_cert_reload_period: Duration,
pub ssl_cert_reload_period: Duration,
/// Trusted root CA certificates to use in https APIs.
#[arg(long)]
ssl_ca_file: Option<Utf8PathBuf>,
pub ssl_ca_file: Option<Utf8PathBuf>,
/// Flag to use https for requests to peer's safekeeper API.
#[arg(long)]
use_https_safekeeper_api: bool,
pub use_https_safekeeper_api: bool,
/// Path to the JWT auth token used to authenticate with other safekeepers.
#[arg(long)]
auth_token_path: Option<Utf8PathBuf>,
/// Enable TLS in WAL service API.
/// Does not force TLS: the client negotiates TLS usage during the handshake.
/// Uses key and certificate from ssl_key_file/ssl_cert_file.
#[arg(long)]
enable_tls_wal_service_api: bool,
}
// Like PathBufValueParser, but allows empty string.
@@ -424,7 +418,6 @@ async fn main() -> anyhow::Result<()> {
ssl_cert_reload_period: args.ssl_cert_reload_period,
ssl_ca_certs,
use_https_safekeeper_api: args.use_https_safekeeper_api,
enable_tls_wal_service_api: args.enable_tls_wal_service_api,
});
// initialize sentry if SENTRY_DSN is provided
@@ -524,36 +517,6 @@ async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
info!("running in current thread runtime");
}
let tls_server_config = if conf.listen_https_addr.is_some() || conf.enable_tls_wal_service_api {
let ssl_key_file = conf.ssl_key_file.clone();
let ssl_cert_file = conf.ssl_cert_file.clone();
let ssl_cert_reload_period = conf.ssl_cert_reload_period;
// Create resolver in BACKGROUND_RUNTIME, so the background certificate reloading
// task is run in this runtime.
let cert_resolver = current_thread_rt
.as_ref()
.unwrap_or_else(|| BACKGROUND_RUNTIME.handle())
.spawn(async move {
ReloadingCertificateResolver::new(
"main",
&ssl_key_file,
&ssl_cert_file,
ssl_cert_reload_period,
)
.await
})
.await??;
let config = rustls::ServerConfig::builder()
.with_no_client_auth()
.with_cert_resolver(cert_resolver);
Some(Arc::new(config))
} else {
None
};
let wal_service_handle = current_thread_rt
.as_ref()
.unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle())
@@ -561,9 +524,6 @@ async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
conf.clone(),
pg_listener,
Scope::SafekeeperData,
conf.enable_tls_wal_service_api
.then(|| tls_server_config.clone())
.flatten(),
global_timelines.clone(),
))
// wrap with task name for error reporting
@@ -592,9 +552,6 @@ async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
conf.clone(),
pg_listener_tenant_only,
Scope::Tenant,
conf.enable_tls_wal_service_api
.then(|| tls_server_config.clone())
.flatten(),
global_timelines.clone(),
))
// wrap with task name for error reporting
@@ -620,7 +577,6 @@ async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
.spawn(http::task_main_https(
conf.clone(),
https_listener,
tls_server_config.expect("tls_server_config is set earlier if https is enabled"),
global_timelines.clone(),
))
.map(|res| ("HTTPS service main".to_owned(), res));

View File

@@ -1,6 +1,7 @@
pub mod routes;
use std::sync::Arc;
use http_utils::tls_certs::ReloadingCertificateResolver;
pub use routes::make_router;
pub use safekeeper_api::models;
use tokio_util::sync::CancellationToken;
@@ -27,10 +28,21 @@ pub async fn task_main_http(
pub async fn task_main_https(
conf: Arc<SafeKeeperConf>,
https_listener: std::net::TcpListener,
tls_config: Arc<rustls::ServerConfig>,
global_timelines: Arc<GlobalTimelines>,
) -> anyhow::Result<()> {
let tls_acceptor = tokio_rustls::TlsAcceptor::from(tls_config);
let cert_resolver = ReloadingCertificateResolver::new(
"main",
&conf.ssl_key_file,
&conf.ssl_cert_file,
conf.ssl_cert_reload_period,
)
.await?;
let server_config = rustls::ServerConfig::builder()
.with_no_client_auth()
.with_cert_resolver(cert_resolver);
let tls_acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(server_config));
let router = make_router(conf, global_timelines)
.build()

View File

@@ -122,7 +122,6 @@ pub struct SafeKeeperConf {
pub ssl_cert_reload_period: Duration,
pub ssl_ca_certs: Vec<Pem>,
pub use_https_safekeeper_api: bool,
pub enable_tls_wal_service_api: bool,
}
impl SafeKeeperConf {
@@ -173,7 +172,6 @@ impl SafeKeeperConf {
ssl_cert_reload_period: Duration::from_secs(60),
ssl_ca_certs: Vec::new(),
use_https_safekeeper_api: false,
enable_tls_wal_service_api: false,
}
}
}
@@ -211,12 +209,3 @@ pub static WAL_BACKUP_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
.build()
.expect("Failed to create WAL backup runtime")
});
pub static BACKGROUND_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_name("background worker")
.worker_threads(1) // there is only one task now (ssl certificate reloading), having more threads doesn't make sense
.enable_all()
.build()
.expect("Failed to create background runtime")
});

View File

@@ -29,7 +29,6 @@ pub async fn task_main(
conf: Arc<SafeKeeperConf>,
pg_listener: std::net::TcpListener,
allowed_auth_scope: Scope,
tls_config: Option<Arc<rustls::ServerConfig>>,
global_timelines: Arc<GlobalTimelines>,
) -> anyhow::Result<()> {
// Tokio's from_std won't do this for us, per its comment.
@@ -44,10 +43,9 @@ pub async fn task_main(
let conf = conf.clone();
let conn_id = issue_connection_id(&mut connection_count);
let global_timelines = global_timelines.clone();
let tls_config = tls_config.clone();
tokio::spawn(
async move {
if let Err(err) = handle_socket(socket, conf, conn_id, allowed_auth_scope, tls_config, global_timelines).await {
if let Err(err) = handle_socket(socket, conf, conn_id, allowed_auth_scope, global_timelines).await {
error!("connection handler exited: {}", err);
}
}
@@ -63,7 +61,6 @@ async fn handle_socket(
conf: Arc<SafeKeeperConf>,
conn_id: ConnectionId,
allowed_auth_scope: Scope,
tls_config: Option<Arc<rustls::ServerConfig>>,
global_timelines: Arc<GlobalTimelines>,
) -> Result<(), QueryError> {
socket.set_nodelay(true)?;
@@ -113,8 +110,7 @@ async fn handle_socket(
auth_pair,
global_timelines,
);
let pgbackend =
PostgresBackend::new_from_io(socket_fd, socket, peer_addr, auth_type, tls_config)?;
let pgbackend = PostgresBackend::new_from_io(socket_fd, socket, peer_addr, auth_type, None)?;
// libpq protocol between safekeeper and walproposer / pageserver
// We don't use shutdown.
pgbackend

View File

@@ -185,7 +185,6 @@ pub fn run_server(os: NodeOs, disk: Arc<SafekeeperDisk>) -> Result<()> {
ssl_cert_reload_period: Duration::ZERO,
ssl_ca_certs: Vec::new(),
use_https_safekeeper_api: false,
enable_tls_wal_service_api: false,
};
let mut global = GlobalMap::new(disk, conf.clone())?;

View File

@@ -151,39 +151,11 @@ impl Service {
"Got {} non-successful responses from initial creation request of total {total_result_count} responses",
remaining.len()
);
let target_sk_count = timeline_persistence.sk_set.len();
let quorum_size = match target_sk_count {
0 => {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"timeline configured without any safekeepers",
)));
}
1 | 2 => {
#[cfg(feature = "testing")]
{
// In test settings, it is allowed to have one or two safekeepers
target_sk_count
}
#[cfg(not(feature = "testing"))]
{
// The region is misconfigured: we need at least three safekeepers to be configured
// in order to schedule work to them
tracing::warn!(
"couldn't find at least 3 safekeepers for timeline, found: {:?}",
timeline_persistence.sk_set
);
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"couldn't find at least 3 safekeepers to put timeline to"
)));
}
}
_ => target_sk_count / 2 + 1,
};
let success_count = target_sk_count - remaining.len();
if success_count < quorum_size {
if remaining.len() >= 2 {
// Failure
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"not enough successful reconciliations to reach quorum size: {success_count} of {quorum_size} of total {target_sk_count}"
"not enough successful reconciliations to reach quorum, please retry: {} errored",
remaining.len()
)));
}
@@ -520,6 +492,8 @@ impl Service {
pub(crate) async fn safekeepers_for_new_timeline(
&self,
) -> Result<Vec<SafekeeperInfo>, ApiError> {
// Number of safekeepers in different AZs we are looking for
let wanted_count = 3;
let mut all_safekeepers = {
let locked = self.inner.read().unwrap();
locked
@@ -558,19 +532,6 @@ impl Service {
sk.1.id.0,
)
});
// Number of safekeepers in different AZs we are looking for
let wanted_count = match all_safekeepers.len() {
0 => {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"couldn't find any active safekeeper for new timeline",
)));
}
// Have laxer requirements on testig mode as we don't want to
// spin up three safekeepers for every single test
#[cfg(feature = "testing")]
1 | 2 => all_safekeepers.len(),
_ => 3,
};
let mut sks = Vec::new();
let mut azs = HashSet::new();
for (_sk_util, sk_info, az_id) in all_safekeepers.iter() {

View File

@@ -417,14 +417,14 @@ class NeonLocalCli(AbstractNeonCli):
cmd.append(f"--instance-id={instance_id}")
return self.raw_cli(cmd)
def endpoint_storage_start(self, timeout_in_seconds: int | None = None):
cmd = ["endpoint-storage", "start"]
def object_storage_start(self, timeout_in_seconds: int | None = None):
cmd = ["object-storage", "start"]
if timeout_in_seconds is not None:
cmd.append(f"--start-timeout={timeout_in_seconds}s")
return self.raw_cli(cmd)
def endpoint_storage_stop(self, immediate: bool):
cmd = ["endpoint-storage", "stop"]
def object_storage_stop(self, immediate: bool):
cmd = ["object-storage", "stop"]
if immediate:
cmd.extend(["-m", "immediate"])
return self.raw_cli(cmd)

View File

@@ -1029,7 +1029,7 @@ class NeonEnvBuilder:
self.env.broker.assert_no_errors()
self.env.endpoint_storage.assert_no_errors()
self.env.object_storage.assert_no_errors()
try:
self.overlay_cleanup_teardown()
@@ -1126,7 +1126,7 @@ class NeonEnv:
pagectl_env_vars["RUST_LOG"] = self.rust_log_override
self.pagectl = Pagectl(extra_env=pagectl_env_vars, binpath=self.neon_binpath)
self.endpoint_storage = EndpointStorage(self)
self.object_storage = ObjectStorage(self)
# The URL for the pageserver to use as its control_plane_api config
if config.storage_controller_port_override is not None:
@@ -1183,7 +1183,7 @@ class NeonEnv:
},
"safekeepers": [],
"pageservers": [],
"endpoint_storage": {"port": self.port_distributor.get_port()},
"object_storage": {"port": self.port_distributor.get_port()},
"generate_local_ssl_certs": self.generate_local_ssl_certs,
}
@@ -1420,7 +1420,7 @@ class NeonEnv:
self.storage_controller.on_safekeeper_deploy(sk_id, body)
self.storage_controller.safekeeper_scheduling_policy(sk_id, "Active")
self.endpoint_storage.start(timeout_in_seconds=timeout_in_seconds)
self.object_storage.start(timeout_in_seconds=timeout_in_seconds)
def stop(self, immediate=False, ps_assert_metric_no_errors=False, fail_on_endpoint_errors=True):
"""
@@ -1439,7 +1439,7 @@ class NeonEnv:
except Exception as e:
raise_later = e
self.endpoint_storage.stop(immediate=immediate)
self.object_storage.stop(immediate=immediate)
# Stop storage controller before pageservers: we don't want it to spuriously
# detect a pageserver "failure" during test teardown
@@ -2660,24 +2660,24 @@ class NeonStorageController(MetricsGetter, LogUtils):
self.stop(immediate=True)
class EndpointStorage(LogUtils):
class ObjectStorage(LogUtils):
def __init__(self, env: NeonEnv):
service_dir = env.repo_dir / "endpoint_storage"
super().__init__(logfile=service_dir / "endpoint_storage.log")
self.conf_path = service_dir / "endpoint_storage.json"
service_dir = env.repo_dir / "object_storage"
super().__init__(logfile=service_dir / "object_storage.log")
self.conf_path = service_dir / "object_storage.json"
self.env = env
def base_url(self):
return json.loads(self.conf_path.read_text())["listen"]
def start(self, timeout_in_seconds: int | None = None):
self.env.neon_cli.endpoint_storage_start(timeout_in_seconds)
self.env.neon_cli.object_storage_start(timeout_in_seconds)
def stop(self, immediate: bool = False):
self.env.neon_cli.endpoint_storage_stop(immediate)
self.env.neon_cli.object_storage_stop(immediate)
def assert_no_errors(self):
assert_no_errors(self.logfile, "endpoint_storage", [])
assert_no_errors(self.logfile, "object_storage", [])
class NeonProxiedStorageController(NeonStorageController):

View File

@@ -65,7 +65,7 @@ def test_ro_replica_lag(
project = neon_api.create_project(pg_version)
project_id = project["project"]["id"]
log.info("Project ID: %s", project_id)
log.info("Primary endpoint ID: %s", project["endpoints"][0]["id"])
log.info("Primary endpoint ID: %s", project["project"]["endpoints"][0]["id"])
neon_api.wait_for_operation_to_finish(project_id)
error_occurred = False
try:
@@ -198,7 +198,7 @@ def test_replication_start_stop(
project = neon_api.create_project(pg_version)
project_id = project["project"]["id"]
log.info("Project ID: %s", project_id)
log.info("Primary endpoint ID: %s", project["endpoints"][0]["id"])
log.info("Primary endpoint ID: %s", project["project"]["endpoints"][0]["id"])
neon_api.wait_for_operation_to_finish(project_id)
try:
branch_id = project["branch"]["id"]

View File

@@ -1,7 +1,6 @@
from __future__ import annotations
import os
import platform
import shutil
import tarfile
from typing import TYPE_CHECKING
@@ -59,18 +58,7 @@ def test_remote_extensions(
extensions_endpoint = f"http://{host}:{port}/pg-ext-s3-gateway"
build_tag = os.environ.get("BUILD_TAG", "latest")
# We have decided to use the Go naming convention due to Kubernetes.
arch = platform.machine()
match arch:
case "aarch64":
arch = "arm64"
case "x86_64":
arch = "amd64"
case _:
pass
archive_route = f"{build_tag}/{arch}/v{pg_version}/extensions/test_extension.tar.zst"
archive_route = f"{build_tag}/v{pg_version}/extensions/test_extension.tar.zst"
tarball = test_output_dir / "test_extension.tar"
extension_dir = (
base_dir / "test_runner" / "regress" / "data" / "test_remote_extensions" / "test_extension"

View File

@@ -138,7 +138,7 @@ def test_cli_start_stop(neon_env_builder: NeonEnvBuilder):
env.neon_cli.pageserver_stop(env.pageserver.id)
env.neon_cli.safekeeper_stop()
env.neon_cli.storage_controller_stop(False)
env.neon_cli.endpoint_storage_stop(False)
env.neon_cli.object_storage_stop(False)
env.neon_cli.storage_broker_stop()
# Keep NeonEnv state up to date, it usually owns starting/stopping services
@@ -185,7 +185,7 @@ def test_cli_start_stop_multi(neon_env_builder: NeonEnvBuilder):
env.neon_cli.safekeeper_stop(neon_env_builder.safekeepers_id_start + 1)
env.neon_cli.safekeeper_stop(neon_env_builder.safekeepers_id_start + 2)
env.neon_cli.endpoint_storage_stop(False)
env.neon_cli.object_storage_stop(False)
# Stop this to get out of the way of the following `start`
env.neon_cli.storage_controller_stop(False)

View File

@@ -8,7 +8,7 @@ from jwcrypto import jwk, jwt
@pytest.mark.asyncio
async def test_endpoint_storage_insert_retrieve_delete(neon_simple_env: NeonEnv):
async def test_object_storage_insert_retrieve_delete(neon_simple_env: NeonEnv):
"""
Inserts, retrieves, and deletes test file using a JWT token
"""
@@ -31,7 +31,7 @@ async def test_endpoint_storage_insert_retrieve_delete(neon_simple_env: NeonEnv)
token.make_signed_token(key)
token = token.serialize()
base_url = env.endpoint_storage.base_url()
base_url = env.object_storage.base_url()
key = f"http://{base_url}/{tenant_id}/{timeline_id}/{endpoint_id}/key"
headers = {"Authorization": f"Bearer {token}"}
log.info(f"cache key url {key}")

View File

@@ -95,7 +95,7 @@ def test_storage_controller_smoke(
env.pageservers[1].start()
for sk in env.safekeepers:
sk.start()
env.endpoint_storage.start()
env.object_storage.start()
# The pageservers we started should have registered with the sharding service on startup
nodes = env.storage_controller.node_list()
@@ -347,7 +347,7 @@ def prepare_onboarding_env(
env = neon_env_builder.init_configs()
env.broker.start()
env.storage_controller.start()
env.endpoint_storage.start()
env.object_storage.start()
# This is the pageserver where we'll initially create the tenant. Run it in emergency
# mode so that it doesn't talk to storage controller, and do not register it.
@@ -1612,18 +1612,16 @@ def test_storage_controller_heartbeats(
env = neon_env_builder.init_configs()
env.start()
env.storage_controller.allowed_errors.extend(
[
# Default log allow list permits connection errors, but this test will use error responses on
# the utilization endpoint.
".*Call to node.*management API.*failed.*failpoint.*",
# The server starts listening to the socket before sending re-attach request,
# but it starts serving HTTP only when re-attach is completed.
# If re-attach is slow (last scenario), storcon's heartbeat requests will time out.
".*Call to node.*management API.*failed.* Timeout.*",
# We will intentionally cause reconcile errors
".*Reconcile error.*",
]
# Default log allow list permits connection errors, but this test will use error responses on
# the utilization endpoint.
env.storage_controller.allowed_errors.append(
".*Call to node.*management API.*failed.*failpoint.*"
)
# The server starts listening to the socket before sending re-attach request,
# but it starts serving HTTP only when re-attach is completed.
# If re-attach is slow (last scenario), storcon's heartbeat requests will time out.
env.storage_controller.allowed_errors.append(
".*Call to node.*management API.*failed.* Timeout.*"
)
# Initially we have two online pageservers
@@ -4242,63 +4240,6 @@ def test_storcon_create_delete_sk_down(
wait_until(timeline_deleted_on_sk)
@run_only_on_default_postgres("PG version is not interesting here")
@pytest.mark.parametrize("num_safekeepers", [1, 2, 3])
@pytest.mark.parametrize("deletetion_subject", [DeletionSubject.TENANT, DeletionSubject.TIMELINE])
def test_storcon_few_sk(
neon_env_builder: NeonEnvBuilder,
num_safekeepers: int,
deletetion_subject: DeletionSubject,
):
"""
Test that the storcon can create and delete tenants and timelines with a limited/special number of safekeepers
- num_safekeepers: number of safekeepers.
- deletion_subject: test that both single timeline and whole tenant deletion work.
"""
neon_env_builder.num_safekeepers = num_safekeepers
safekeeper_list = list(range(1, num_safekeepers + 1))
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": True,
}
env = neon_env_builder.init_start()
tenant_id = TenantId.generate()
timeline_id = TimelineId.generate()
env.create_tenant(tenant_id, timeline_id)
child_timeline_id = env.create_branch("child_of_main", tenant_id)
env.safekeepers[0].assert_log_contains(f"creating new timeline {tenant_id}/{timeline_id}")
config_lines = [
"neon.safekeeper_proto_version = 3",
]
with env.endpoints.create("main", tenant_id=tenant_id, config_lines=config_lines) as ep:
# endpoint should start.
ep.start(safekeeper_generation=1, safekeepers=safekeeper_list)
ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)")
with env.endpoints.create(
"child_of_main", tenant_id=tenant_id, config_lines=config_lines
) as ep:
# endpoint should start.
ep.start(safekeeper_generation=1, safekeepers=safekeeper_list)
ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)")
if deletetion_subject is DeletionSubject.TENANT:
env.storage_controller.pageserver_api().tenant_delete(tenant_id)
else:
env.storage_controller.pageserver_api().timeline_delete(tenant_id, child_timeline_id)
# ensure that there is log msgs for the third safekeeper too
def timeline_deleted_on_sk():
env.safekeepers[0].assert_log_contains(
f"deleting timeline {tenant_id}/{child_timeline_id} from disk"
)
wait_until(timeline_deleted_on_sk)
@pytest.mark.parametrize("wrong_az", [True, False])
def test_storage_controller_graceful_migration(neon_env_builder: NeonEnvBuilder, wrong_az: bool):
"""
@@ -4520,56 +4461,3 @@ def test_storage_controller_migrate_with_pageserver_restart(
"shards": [{"node_id": int(secondary.id), "shard_number": 0}],
"preferred_az": DEFAULT_AZ_ID,
}
def test_storage_controller_shard_scheduling_policy_essential(neon_env_builder: NeonEnvBuilder):
"""
Check if essential scheduling policy works as expected.
"""
neon_env_builder.num_pageservers = 2
env = neon_env_builder.init_configs()
env.start()
env.storage_controller.tenant_create(env.initial_tenant)
env.storage_controller.tenant_policy_update(
env.initial_tenant,
{
"placement": {"Attached": 1},
"scheduling": "Essential",
},
)
env.storage_controller.reconcile_until_idle()
# Ensure that the tenant is attached to both: one is primary, the other is secondary
pageserver_1_attachments = (
env.pageservers[0].http_client().tenant_list_locations()["tenant_shards"]
)
pageserver_2_attachments = (
env.pageservers[1].http_client().tenant_list_locations()["tenant_shards"]
)
assert len(pageserver_1_attachments) == 1
assert len(pageserver_2_attachments) == 1
primary_pageserver = None
if pageserver_1_attachments[0][1]["mode"] == "AttachedSingle":
primary_pageserver = 0
assert pageserver_2_attachments[0][1]["mode"] == "Secondary"
elif pageserver_1_attachments[0][1]["mode"] == "Secondary":
primary_pageserver = 1
assert pageserver_2_attachments[0][1]["mode"] == "AttachedSingle"
else:
assert False, "unreachable"
secondary_pageserver = 1 - primary_pageserver
# # Ensure the tenant gets attached to the secondary pageserver
# env.pageservers[primary_pageserver].stop()
# env.storage_controller.node_configure(
# env.pageservers[primary_pageserver].id, {"availability": "Offline"}
# )
env.storage_controller.node_drain(env.pageservers[primary_pageserver].id)
env.storage_controller.reconcile_until_idle()
assert (
env.pageservers[secondary_pageserver]
.http_client()
.tenant_list_locations()["tenant_shards"][0][1]["mode"]
== "AttachedSingle"
)