mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-11 06:30:37 +00:00
Compare commits
27 Commits
luist18/po
...
alek/abort
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
09dacdacfb | ||
|
|
3db74c151f | ||
|
|
11531b5e83 | ||
|
|
f2095d7264 | ||
|
|
f14dc1abbb | ||
|
|
f64fb4eadc | ||
|
|
cc197fb00b | ||
|
|
f999db9a9e | ||
|
|
1579808218 | ||
|
|
2848a290e7 | ||
|
|
e173a218f3 | ||
|
|
6c6b457271 | ||
|
|
f9c93d259a | ||
|
|
ef120693bc | ||
|
|
b58a29f8f3 | ||
|
|
a6097408cc | ||
|
|
986ee66358 | ||
|
|
c7cb5f7119 | ||
|
|
cca54fdfbf | ||
|
|
c7492fa094 | ||
|
|
aba3fafe2c | ||
|
|
fcc57f49d1 | ||
|
|
285f687e1b | ||
|
|
60adcb18e5 | ||
|
|
dc45bd2177 | ||
|
|
5336aea799 | ||
|
|
5b2dcfa4e1 |
47
Cargo.lock
generated
47
Cargo.lock
generated
@@ -740,6 +740,9 @@ name = "cc"
|
||||
version = "1.0.79"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "50d30906286121d95be3d479533b458f87493b30a4b5f79a607db8f5d11aa91f"
|
||||
dependencies = [
|
||||
"jobserver",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cexpr"
|
||||
@@ -907,12 +910,14 @@ dependencies = [
|
||||
"opentelemetry",
|
||||
"postgres",
|
||||
"regex",
|
||||
"remote_storage",
|
||||
"reqwest",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tar",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
"toml_edit",
|
||||
"tracing",
|
||||
"tracing-opentelemetry",
|
||||
"tracing-subscriber",
|
||||
@@ -920,6 +925,7 @@ dependencies = [
|
||||
"url",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
"zstd",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -980,6 +986,7 @@ dependencies = [
|
||||
"tar",
|
||||
"thiserror",
|
||||
"toml",
|
||||
"tracing",
|
||||
"url",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
@@ -1972,6 +1979,15 @@ version = "1.0.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "453ad9f582a441959e5f0d088b02ce04cfe8d51a8eaf077f12ac6d3e94164ca6"
|
||||
|
||||
[[package]]
|
||||
name = "jobserver"
|
||||
version = "0.1.26"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "936cfd212a0155903bcbc060e316fb6cc7cbf2e1907329391ebadc1fe0ce77c2"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "js-sys"
|
||||
version = "0.3.63"
|
||||
@@ -5293,6 +5309,7 @@ version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bytes",
|
||||
"cc",
|
||||
"chrono",
|
||||
"clap",
|
||||
"clap_builder",
|
||||
@@ -5393,3 +5410,33 @@ name = "zeroize"
|
||||
version = "1.6.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2a0956f1ba7c7909bfb66c2e9e4124ab6f6482560f6628b5aaeba39207c9aad9"
|
||||
|
||||
[[package]]
|
||||
name = "zstd"
|
||||
version = "0.12.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1a27595e173641171fc74a1232b7b1c7a7cb6e18222c11e9dfb9888fa424c53c"
|
||||
dependencies = [
|
||||
"zstd-safe",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "zstd-safe"
|
||||
version = "6.0.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ee98ffd0b48ee95e6c5168188e44a54550b1564d9d530ee21d5f0eaed1069581"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"zstd-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "zstd-sys"
|
||||
version = "2.0.8+zstd.1.5.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5556e6ee25d32df2586c098bbfa278803692a20d0ab9565e049480d52707ec8c"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"libc",
|
||||
"pkg-config",
|
||||
]
|
||||
|
||||
@@ -32,3 +32,6 @@ url.workspace = true
|
||||
compute_api.workspace = true
|
||||
utils.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
toml_edit.workspace = true
|
||||
remote_storage = { version = "0.1", path = "../libs/remote_storage/" }
|
||||
zstd = "0.12.4"
|
||||
|
||||
@@ -5,6 +5,8 @@
|
||||
//! - `compute_ctl` accepts cluster (compute node) specification as a JSON file.
|
||||
//! - Every start is a fresh start, so the data directory is removed and
|
||||
//! initialized again on each run.
|
||||
//! - If remote_extension_config is provided, it will be used to fetch extensions list
|
||||
//! and download `shared_preload_libraries` from the remote storage.
|
||||
//! - Next it will put configuration files into the `PGDATA` directory.
|
||||
//! - Sync safekeepers and get commit LSN.
|
||||
//! - Get `basebackup` from pageserver using the returned on the previous step LSN.
|
||||
@@ -27,15 +29,16 @@
|
||||
//! compute_ctl -D /var/db/postgres/compute \
|
||||
//! -C 'postgresql://cloud_admin@localhost/postgres' \
|
||||
//! -S /var/db/postgres/specs/current.json \
|
||||
//! -b /usr/local/bin/postgres
|
||||
//! -b /usr/local/bin/postgres \
|
||||
//! -r {"bucket": "my-bucket", "region": "eu-central-1", "endpoint": "http:://localhost:9000"}
|
||||
//! ```
|
||||
//!
|
||||
use std::collections::HashMap;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::fs::File;
|
||||
use std::panic;
|
||||
use std::path::Path;
|
||||
use std::process::exit;
|
||||
use std::sync::{mpsc, Arc, Condvar, Mutex};
|
||||
use std::sync::{mpsc, Arc, Condvar, Mutex, OnceLock};
|
||||
use std::{thread, time::Duration};
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
@@ -48,22 +51,33 @@ use compute_api::responses::ComputeStatus;
|
||||
|
||||
use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec};
|
||||
use compute_tools::configurator::launch_configurator;
|
||||
use compute_tools::extension_server::{get_pg_version, init_remote_storage};
|
||||
use compute_tools::http::api::launch_http_server;
|
||||
use compute_tools::logger::*;
|
||||
use compute_tools::monitor::launch_monitor;
|
||||
use compute_tools::params::*;
|
||||
use compute_tools::spec::*;
|
||||
|
||||
const BUILD_TAG_DEFAULT: &str = "local";
|
||||
// this is an arbitrary build tag. Fine as a default / for testing purposes
|
||||
// in-case of not-set environment var
|
||||
const BUILD_TAG_DEFAULT: &str = "5670669815";
|
||||
|
||||
fn main() -> Result<()> {
|
||||
init_tracing_and_logging(DEFAULT_LOG_LEVEL)?;
|
||||
|
||||
let build_tag = option_env!("BUILD_TAG").unwrap_or(BUILD_TAG_DEFAULT);
|
||||
|
||||
let build_tag = option_env!("BUILD_TAG")
|
||||
.unwrap_or(BUILD_TAG_DEFAULT)
|
||||
.to_string();
|
||||
info!("build_tag: {build_tag}");
|
||||
|
||||
let matches = cli().get_matches();
|
||||
let pgbin_default = String::from("postgres");
|
||||
let pgbin = matches.get_one::<String>("pgbin").unwrap_or(&pgbin_default);
|
||||
|
||||
let remote_ext_config = matches.get_one::<String>("remote-ext-config");
|
||||
let ext_remote_storage = remote_ext_config.map(|x| {
|
||||
init_remote_storage(x).expect("cannot initialize remote extension storage from config")
|
||||
});
|
||||
|
||||
let http_port = *matches
|
||||
.get_one::<u16>("http-port")
|
||||
@@ -128,9 +142,6 @@ fn main() -> Result<()> {
|
||||
let compute_id = matches.get_one::<String>("compute-id");
|
||||
let control_plane_uri = matches.get_one::<String>("control-plane-uri");
|
||||
|
||||
// Try to use just 'postgres' if no path is provided
|
||||
let pgbin = matches.get_one::<String>("pgbin").unwrap();
|
||||
|
||||
let spec;
|
||||
let mut live_config_allowed = false;
|
||||
match spec_json {
|
||||
@@ -168,6 +179,7 @@ fn main() -> Result<()> {
|
||||
|
||||
let mut new_state = ComputeState::new();
|
||||
let spec_set;
|
||||
|
||||
if let Some(spec) = spec {
|
||||
let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?;
|
||||
new_state.pspec = Some(pspec);
|
||||
@@ -179,9 +191,15 @@ fn main() -> Result<()> {
|
||||
connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?,
|
||||
pgdata: pgdata.to_string(),
|
||||
pgbin: pgbin.to_string(),
|
||||
pgversion: get_pg_version(pgbin),
|
||||
live_config_allowed,
|
||||
state: Mutex::new(new_state),
|
||||
state_changed: Condvar::new(),
|
||||
ext_remote_storage,
|
||||
ext_remote_paths: OnceLock::new(),
|
||||
started_to_download_extensions: Mutex::new(HashSet::new()),
|
||||
library_index: OnceLock::new(),
|
||||
build_tag,
|
||||
};
|
||||
let compute = Arc::new(compute_node);
|
||||
|
||||
@@ -190,6 +208,8 @@ fn main() -> Result<()> {
|
||||
let _http_handle =
|
||||
launch_http_server(http_port, &compute).expect("cannot launch http endpoint thread");
|
||||
|
||||
let extension_server_port: u16 = http_port;
|
||||
|
||||
if !spec_set {
|
||||
// No spec provided, hang waiting for it.
|
||||
info!("no compute spec provided, waiting");
|
||||
@@ -230,7 +250,7 @@ fn main() -> Result<()> {
|
||||
// Start Postgres
|
||||
let mut delay_exit = false;
|
||||
let mut exit_code = None;
|
||||
let pg = match compute.start_compute() {
|
||||
let pg = match compute.start_compute(extension_server_port) {
|
||||
Ok(pg) => Some(pg),
|
||||
Err(err) => {
|
||||
error!("could not start the compute node: {:?}", err);
|
||||
@@ -359,6 +379,12 @@ fn cli() -> clap::Command {
|
||||
.long("control-plane-uri")
|
||||
.value_name("CONTROL_PLANE_API_BASE_URI"),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("remote-ext-config")
|
||||
.short('r')
|
||||
.long("remote-ext-config")
|
||||
.value_name("REMOTE_EXT_CONFIG"),
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -1,14 +1,18 @@
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::fs;
|
||||
use std::io::BufRead;
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
use std::path::Path;
|
||||
use std::process::{Command, Stdio};
|
||||
use std::str::FromStr;
|
||||
use std::sync::{Condvar, Mutex};
|
||||
use std::sync::{Condvar, Mutex, OnceLock};
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use chrono::{DateTime, Utc};
|
||||
use futures::future::join_all;
|
||||
use postgres::{Client, NoTls};
|
||||
use tokio;
|
||||
use tokio_postgres;
|
||||
use tracing::{info, instrument, warn};
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
@@ -18,9 +22,11 @@ use compute_api::responses::{ComputeMetrics, ComputeStatus};
|
||||
use compute_api::spec::{ComputeMode, ComputeSpec};
|
||||
use utils::measured_stream::MeasuredReader;
|
||||
|
||||
use crate::config;
|
||||
use remote_storage::{GenericRemoteStorage, RemotePath};
|
||||
|
||||
use crate::pg_helpers::*;
|
||||
use crate::spec::*;
|
||||
use crate::{config, extension_server};
|
||||
|
||||
/// Compute node info shared across several `compute_ctl` threads.
|
||||
pub struct ComputeNode {
|
||||
@@ -28,6 +34,7 @@ pub struct ComputeNode {
|
||||
pub connstr: url::Url,
|
||||
pub pgdata: String,
|
||||
pub pgbin: String,
|
||||
pub pgversion: String,
|
||||
/// We should only allow live re- / configuration of the compute node if
|
||||
/// it uses 'pull model', i.e. it can go to control-plane and fetch
|
||||
/// the latest configuration. Otherwise, there could be a case:
|
||||
@@ -47,6 +54,13 @@ pub struct ComputeNode {
|
||||
pub state: Mutex<ComputeState>,
|
||||
/// `Condvar` to allow notifying waiters about state changes.
|
||||
pub state_changed: Condvar,
|
||||
/// the S3 bucket that we search for extensions in
|
||||
pub ext_remote_storage: Option<GenericRemoteStorage>,
|
||||
// (key: extension name, value: path to extension archive in remote storage)
|
||||
pub ext_remote_paths: OnceLock<HashMap<String, RemotePath>>,
|
||||
pub library_index: OnceLock<HashMap<String, String>>,
|
||||
pub started_to_download_extensions: Mutex<HashSet<String>>,
|
||||
pub build_tag: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
@@ -357,14 +371,22 @@ impl ComputeNode {
|
||||
/// Do all the preparations like PGDATA directory creation, configuration,
|
||||
/// safekeepers sync, basebackup, etc.
|
||||
#[instrument(skip_all)]
|
||||
pub fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> {
|
||||
pub fn prepare_pgdata(
|
||||
&self,
|
||||
compute_state: &ComputeState,
|
||||
extension_server_port: u16,
|
||||
) -> Result<()> {
|
||||
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||
let spec = &pspec.spec;
|
||||
let pgdata_path = Path::new(&self.pgdata);
|
||||
|
||||
// Remove/create an empty pgdata directory and put configuration there.
|
||||
self.create_pgdata()?;
|
||||
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &pspec.spec)?;
|
||||
config::write_postgres_conf(
|
||||
&pgdata_path.join("postgresql.conf"),
|
||||
&pspec.spec,
|
||||
Some(extension_server_port),
|
||||
)?;
|
||||
|
||||
// Syncing safekeepers is only safe with primary nodes: if a primary
|
||||
// is already connected it will be kicked out, so a secondary (standby)
|
||||
@@ -506,7 +528,7 @@ impl ComputeNode {
|
||||
|
||||
// Write new config
|
||||
let pgdata_path = Path::new(&self.pgdata);
|
||||
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec)?;
|
||||
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec, None)?;
|
||||
|
||||
let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
|
||||
self.pg_reload_conf(&mut client)?;
|
||||
@@ -536,7 +558,7 @@ impl ComputeNode {
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub fn start_compute(&self) -> Result<std::process::Child> {
|
||||
pub fn start_compute(&self, extension_server_port: u16) -> Result<std::process::Child> {
|
||||
let compute_state = self.state.lock().unwrap().clone();
|
||||
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||
info!(
|
||||
@@ -547,7 +569,26 @@ impl ComputeNode {
|
||||
pspec.timeline_id,
|
||||
);
|
||||
|
||||
self.prepare_pgdata(&compute_state)?;
|
||||
// This part is sync, because we need to download
|
||||
// remote shared_preload_libraries before postgres start (if any)
|
||||
{
|
||||
let library_load_start_time = Utc::now();
|
||||
self.prepare_preload_libraries(&compute_state)?;
|
||||
|
||||
let library_load_time = Utc::now()
|
||||
.signed_duration_since(library_load_start_time)
|
||||
.to_std()
|
||||
.unwrap()
|
||||
.as_millis() as u64;
|
||||
let mut state = self.state.lock().unwrap();
|
||||
state.metrics.load_libraries_ms = library_load_time;
|
||||
info!(
|
||||
"Loading shared_preload_libraries took {:?}ms",
|
||||
library_load_time
|
||||
);
|
||||
}
|
||||
|
||||
self.prepare_pgdata(&compute_state, extension_server_port)?;
|
||||
|
||||
let start_time = Utc::now();
|
||||
let pg = self.start_postgres(pspec.storage_auth_token.clone())?;
|
||||
@@ -695,4 +736,126 @@ LIMIT 100",
|
||||
"{{\"pg_stat_statements\": []}}".to_string()
|
||||
}
|
||||
}
|
||||
|
||||
// If remote extension storage is configured,
|
||||
// download extension control files
|
||||
pub async fn prepare_external_extensions(&self, compute_state: &ComputeState) -> Result<()> {
|
||||
if let Some(ref ext_remote_storage) = self.ext_remote_storage {
|
||||
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||
let spec = &pspec.spec;
|
||||
let custom_ext = spec.custom_extensions.clone().unwrap_or(Vec::new());
|
||||
info!("custom extensions: {:?}", &custom_ext);
|
||||
let (ext_remote_paths, library_index) = extension_server::get_available_extensions(
|
||||
ext_remote_storage,
|
||||
&self.pgbin,
|
||||
&self.pgversion,
|
||||
&custom_ext,
|
||||
&self.build_tag,
|
||||
)
|
||||
.await?;
|
||||
self.ext_remote_paths
|
||||
.set(ext_remote_paths)
|
||||
.expect("this is the only time we set ext_remote_paths");
|
||||
self.library_index
|
||||
.set(library_index)
|
||||
.expect("this is the only time we set library_index");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn download_extension(&self, ext_name: &str, is_library: bool) -> Result<()> {
|
||||
match &self.ext_remote_storage {
|
||||
None => anyhow::bail!("No remote extension storage"),
|
||||
Some(remote_storage) => {
|
||||
let mut real_ext_name = ext_name.to_string();
|
||||
if is_library {
|
||||
real_ext_name = real_ext_name.replace(".so", "");
|
||||
real_ext_name = self
|
||||
.library_index
|
||||
.get()
|
||||
.expect("must have already downloaded the library_index")[&real_ext_name]
|
||||
.clone();
|
||||
}
|
||||
|
||||
{
|
||||
let mut started_to_download_extensions = self
|
||||
.started_to_download_extensions
|
||||
.lock()
|
||||
.expect("bad lock");
|
||||
if started_to_download_extensions.contains(&real_ext_name) {
|
||||
info!(
|
||||
"extension {:?} already exists, skipping download",
|
||||
&ext_name
|
||||
);
|
||||
return Ok(());
|
||||
} else {
|
||||
started_to_download_extensions.insert(real_ext_name.clone());
|
||||
}
|
||||
}
|
||||
extension_server::download_extension(
|
||||
&real_ext_name,
|
||||
&self
|
||||
.ext_remote_paths
|
||||
.get()
|
||||
.expect("error accessing ext_remote_paths")[&real_ext_name],
|
||||
remote_storage,
|
||||
&self.pgbin,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
pub async fn prepare_preload_libraries(&self, compute_state: &ComputeState) -> Result<()> {
|
||||
if self.ext_remote_storage.is_none() {
|
||||
return Ok(());
|
||||
}
|
||||
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||
let spec = &pspec.spec;
|
||||
|
||||
info!("parse shared_preload_libraries from spec.cluster.settings");
|
||||
let mut libs_vec = Vec::new();
|
||||
if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
|
||||
libs_vec = libs
|
||||
.split(&[',', '\'', ' '])
|
||||
.filter(|s| *s != "neon" && !s.is_empty())
|
||||
.map(str::to_string)
|
||||
.collect();
|
||||
}
|
||||
info!("parse shared_preload_libraries from provided postgresql.conf");
|
||||
// that is used in neon_local and python tests
|
||||
if let Some(conf) = &spec.cluster.postgresql_conf {
|
||||
let conf_lines = conf.split('\n').collect::<Vec<&str>>();
|
||||
let mut shared_preload_libraries_line = "";
|
||||
for line in conf_lines {
|
||||
if line.starts_with("shared_preload_libraries") {
|
||||
shared_preload_libraries_line = line;
|
||||
}
|
||||
}
|
||||
let mut preload_libs_vec = Vec::new();
|
||||
if let Some(libs) = shared_preload_libraries_line.split("='").nth(1) {
|
||||
preload_libs_vec = libs
|
||||
.split(&[',', '\'', ' '])
|
||||
.filter(|s| *s != "neon" && !s.is_empty())
|
||||
.map(str::to_string)
|
||||
.collect();
|
||||
}
|
||||
libs_vec.extend(preload_libs_vec);
|
||||
}
|
||||
|
||||
info!("Download ext_index.json, find the extension paths");
|
||||
self.prepare_external_extensions(compute_state).await?;
|
||||
|
||||
info!("Downloading to shared preload libraries: {:?}", &libs_vec);
|
||||
let mut download_tasks = Vec::new();
|
||||
for library in &libs_vec {
|
||||
download_tasks.push(self.download_extension(library, true));
|
||||
}
|
||||
let results = join_all(download_tasks).await;
|
||||
for result in results {
|
||||
result?; // propogate any errors
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -33,7 +33,11 @@ pub fn line_in_file(path: &Path, line: &str) -> Result<bool> {
|
||||
}
|
||||
|
||||
/// Create or completely rewrite configuration file specified by `path`
|
||||
pub fn write_postgres_conf(path: &Path, spec: &ComputeSpec) -> Result<()> {
|
||||
pub fn write_postgres_conf(
|
||||
path: &Path,
|
||||
spec: &ComputeSpec,
|
||||
extension_server_port: Option<u16>,
|
||||
) -> Result<()> {
|
||||
// File::create() destroys the file content if it exists.
|
||||
let mut file = File::create(path)?;
|
||||
|
||||
@@ -87,5 +91,9 @@ pub fn write_postgres_conf(path: &Path, spec: &ComputeSpec) -> Result<()> {
|
||||
writeln!(file, "# Managed by compute_ctl: end")?;
|
||||
}
|
||||
|
||||
if let Some(port) = extension_server_port {
|
||||
writeln!(file, "neon.extension_server_port={}", port)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
257
compute_tools/src/extension_server.rs
Normal file
257
compute_tools/src/extension_server.rs
Normal file
@@ -0,0 +1,257 @@
|
||||
// Download extension files from the extension store
|
||||
// and put them in the right place in the postgres directory (share / lib)
|
||||
/*
|
||||
The layout of the S3 bucket is as follows:
|
||||
5615610098 // this is an extension build number
|
||||
├── v14
|
||||
│ ├── extensions
|
||||
│ │ ├── anon.tar.zst
|
||||
│ │ └── embedding.tar.zst
|
||||
│ └── ext_index.json
|
||||
└── v15
|
||||
├── extensions
|
||||
│ ├── anon.tar.zst
|
||||
│ └── embedding.tar.zst
|
||||
└── ext_index.json
|
||||
5615261079
|
||||
├── v14
|
||||
│ ├── extensions
|
||||
│ │ └── anon.tar.zst
|
||||
│ └── ext_index.json
|
||||
└── v15
|
||||
├── extensions
|
||||
│ └── anon.tar.zst
|
||||
└── ext_index.json
|
||||
5623261088
|
||||
├── v14
|
||||
│ ├── extensions
|
||||
│ │ └── embedding.tar.zst
|
||||
│ └── ext_index.json
|
||||
└── v15
|
||||
├── extensions
|
||||
│ └── embedding.tar.zst
|
||||
└── ext_index.json
|
||||
|
||||
Note that build number cannot be part of prefix because we might need extensions
|
||||
from other build numbers.
|
||||
|
||||
ext_index.json stores the control files and location of extension archives
|
||||
|
||||
We do not duplicate extension.tar.zst files.
|
||||
We only upload a new one if it is updated.
|
||||
*access* is controlled by spec
|
||||
|
||||
More specifically, here is an example ext_index.json
|
||||
{
|
||||
"embedding": {
|
||||
"control_data": {
|
||||
"embedding.control": "comment = 'hnsw index' \ndefault_version = '0.1.0' \nmodule_pathname = '$libdir/embedding' \nrelocatable = true \ntrusted = true"
|
||||
},
|
||||
"archive_path": "5623261088/v15/extensions/embedding.tar.zst"
|
||||
},
|
||||
"anon": {
|
||||
"control_data": {
|
||||
"anon.control": "# PostgreSQL Anonymizer (anon) extension \ncomment = 'Data anonymization tools' \ndefault_version = '1.1.0' \ndirectory='extension/anon' \nrelocatable = false \nrequires = 'pgcrypto' \nsuperuser = false \nmodule_pathname = '$libdir/anon' \ntrusted = true \n"
|
||||
},
|
||||
"archive_path": "5615261079/v15/extensions/anon.tar.zst"
|
||||
}
|
||||
}
|
||||
*/
|
||||
use anyhow::Context;
|
||||
use anyhow::{self, Result};
|
||||
use futures::future::join_all;
|
||||
use remote_storage::*;
|
||||
use serde_json;
|
||||
use std::collections::HashMap;
|
||||
use std::io::Read;
|
||||
use std::num::{NonZeroU32, NonZeroUsize};
|
||||
use std::path::Path;
|
||||
use std::str;
|
||||
use tar::Archive;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tracing::info;
|
||||
use tracing::log::warn;
|
||||
use zstd::stream::read::Decoder;
|
||||
|
||||
fn get_pg_config(argument: &str, pgbin: &str) -> String {
|
||||
// gives the result of `pg_config [argument]`
|
||||
// where argument is a flag like `--version` or `--sharedir`
|
||||
let pgconfig = pgbin
|
||||
.strip_suffix("postgres")
|
||||
.expect("bad pgbin")
|
||||
.to_owned()
|
||||
+ "/pg_config";
|
||||
let config_output = std::process::Command::new(pgconfig)
|
||||
.arg(argument)
|
||||
.output()
|
||||
.expect("pg_config error");
|
||||
std::str::from_utf8(&config_output.stdout)
|
||||
.expect("pg_config error")
|
||||
.trim()
|
||||
.to_string()
|
||||
}
|
||||
|
||||
pub fn get_pg_version(pgbin: &str) -> String {
|
||||
// pg_config --version returns a (platform specific) human readable string
|
||||
// such as "PostgreSQL 15.4". We parse this to v14/v15
|
||||
let human_version = get_pg_config("--version", pgbin);
|
||||
if human_version.contains("15") {
|
||||
return "v15".to_string();
|
||||
} else if human_version.contains("14") {
|
||||
return "v14".to_string();
|
||||
}
|
||||
panic!("Unsuported postgres version {human_version}");
|
||||
}
|
||||
|
||||
// download control files for enabled_extensions
|
||||
// return the paths in s3 to the archives containing the actual extension files
|
||||
// for use in creating the extension
|
||||
pub async fn get_available_extensions(
|
||||
remote_storage: &GenericRemoteStorage,
|
||||
pgbin: &str,
|
||||
pg_version: &str,
|
||||
custom_extensions: &[String],
|
||||
build_tag: &str,
|
||||
) -> Result<(HashMap<String, RemotePath>, HashMap<String, String>)> {
|
||||
let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
|
||||
let index_path = format!("{build_tag}/{pg_version}/ext_index.json");
|
||||
let index_path = RemotePath::new(Path::new(&index_path)).context("error forming path")?;
|
||||
info!("download ext_index.json from: {:?}", &index_path);
|
||||
|
||||
let mut download = better_download(remote_storage, &index_path).await?;
|
||||
let mut ext_idx_buffer = Vec::new();
|
||||
download
|
||||
.download_stream
|
||||
.read_to_end(&mut ext_idx_buffer)
|
||||
.await?;
|
||||
|
||||
#[derive(Debug, serde::Deserialize)]
|
||||
struct Index {
|
||||
public_extensions: Vec<String>,
|
||||
library_index: HashMap<String, String>,
|
||||
extension_data: HashMap<String, ExtensionData>,
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Deserialize)]
|
||||
struct ExtensionData {
|
||||
control_data: HashMap<String, String>,
|
||||
archive_path: String,
|
||||
}
|
||||
|
||||
let ext_index_full = serde_json::from_slice::<Index>(&ext_idx_buffer)?;
|
||||
let mut enabled_extensions = ext_index_full.public_extensions;
|
||||
enabled_extensions.extend_from_slice(custom_extensions);
|
||||
let library_index = ext_index_full.library_index;
|
||||
let all_extension_data = ext_index_full.extension_data;
|
||||
|
||||
info!("library_index {:?}", &library_index);
|
||||
info!("enabled_extensions: {:?}", enabled_extensions);
|
||||
let mut ext_remote_paths = HashMap::new();
|
||||
let mut file_create_tasks = Vec::new();
|
||||
for extension in enabled_extensions {
|
||||
let ext_data = &all_extension_data[&extension];
|
||||
for (control_file, control_contents) in &ext_data.control_data {
|
||||
let extension_name = control_file
|
||||
.strip_suffix(".control")
|
||||
.expect("control files must end in .control");
|
||||
ext_remote_paths.insert(
|
||||
extension_name.to_string(),
|
||||
RemotePath::from_string(&ext_data.archive_path)?,
|
||||
);
|
||||
|
||||
let control_path = local_sharedir.join(control_file);
|
||||
info!("writing file {:?}{:?}", control_path, control_contents);
|
||||
file_create_tasks.push(tokio::fs::write(control_path, control_contents));
|
||||
}
|
||||
}
|
||||
let results = join_all(file_create_tasks).await;
|
||||
for result in results {
|
||||
result?;
|
||||
}
|
||||
info!("ext_remote_paths {:?}", ext_remote_paths);
|
||||
Ok((ext_remote_paths, library_index))
|
||||
}
|
||||
|
||||
// download the archive for a given extension,
|
||||
// unzip it, and place files in the appropriate locations (share/lib)
|
||||
pub async fn download_extension(
|
||||
ext_name: &str,
|
||||
ext_path: &RemotePath,
|
||||
remote_storage: &GenericRemoteStorage,
|
||||
pgbin: &str,
|
||||
) -> Result<()> {
|
||||
info!("Download extension {:?} from {:?}", ext_name, ext_path);
|
||||
let mut download = better_download(remote_storage, ext_path).await?;
|
||||
let mut download_buffer = Vec::new();
|
||||
download
|
||||
.download_stream
|
||||
.read_to_end(&mut download_buffer)
|
||||
.await?;
|
||||
let mut decoder = Decoder::new(download_buffer.as_slice())?;
|
||||
let mut decompress_buffer = Vec::new();
|
||||
decoder.read_to_end(&mut decompress_buffer)?;
|
||||
let mut archive = Archive::new(decompress_buffer.as_slice());
|
||||
let unzip_dest = pgbin
|
||||
.strip_suffix("/bin/postgres")
|
||||
.expect("bad pgbin")
|
||||
.to_string()
|
||||
+ "/download_extensions";
|
||||
archive.unpack(&unzip_dest)?;
|
||||
info!("Download + unzip {:?} completed successfully", &ext_path);
|
||||
|
||||
let sharedir_paths = (
|
||||
unzip_dest.to_string() + "/share/extension",
|
||||
Path::new(&get_pg_config("--sharedir", pgbin)).join("extension"),
|
||||
);
|
||||
let libdir_paths = (
|
||||
unzip_dest.to_string() + "/lib",
|
||||
Path::new(&get_pg_config("--libdir", pgbin)).join("postgresql"),
|
||||
);
|
||||
// move contents of the libdir / sharedir in unzipped archive to the correct local paths
|
||||
for paths in [sharedir_paths, libdir_paths] {
|
||||
let (zip_dir, real_dir) = paths;
|
||||
info!("mv {zip_dir:?}/* {real_dir:?}");
|
||||
for file in std::fs::read_dir(zip_dir)? {
|
||||
let old_file = file?.path();
|
||||
let new_file =
|
||||
Path::new(&real_dir).join(old_file.file_name().context("error parsing file")?);
|
||||
info!("moving {old_file:?} to {new_file:?}");
|
||||
|
||||
// extension download failed: Directory not empty (os error 39)
|
||||
match std::fs::rename(old_file, new_file) {
|
||||
Ok(()) => info!("move succeeded"),
|
||||
Err(e) => {
|
||||
warn!("move failed, probably because the extension already exists: {e}")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// This function initializes the necessary structs to use remote storage (should be fairly cheap)
|
||||
pub fn init_remote_storage(remote_ext_config: &str) -> anyhow::Result<GenericRemoteStorage> {
|
||||
#[derive(Debug, serde::Deserialize)]
|
||||
struct RemoteExtJson {
|
||||
bucket: String,
|
||||
region: String,
|
||||
endpoint: Option<String>,
|
||||
prefix: Option<String>,
|
||||
}
|
||||
let remote_ext_json = serde_json::from_str::<RemoteExtJson>(remote_ext_config)?;
|
||||
|
||||
let config = S3Config {
|
||||
bucket_name: remote_ext_json.bucket,
|
||||
bucket_region: remote_ext_json.region,
|
||||
prefix_in_bucket: remote_ext_json.prefix,
|
||||
endpoint: remote_ext_json.endpoint,
|
||||
concurrency_limit: NonZeroUsize::new(100).expect("100 != 0"),
|
||||
max_keys_per_list_response: None,
|
||||
};
|
||||
let config = RemoteStorageConfig {
|
||||
max_concurrent_syncs: NonZeroUsize::new(100).expect("100 != 0"),
|
||||
max_sync_errors: NonZeroU32::new(100).expect("100 != 0"),
|
||||
storage: RemoteStorageKind::AwsS3(config),
|
||||
};
|
||||
GenericRemoteStorage::from_config(&config)
|
||||
}
|
||||
@@ -121,6 +121,37 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
|
||||
}
|
||||
}
|
||||
|
||||
// download extension files from S3 on demand
|
||||
(&Method::POST, route) if route.starts_with("/extension_server/") => {
|
||||
info!("serving {:?} POST request", route);
|
||||
info!("req.uri {:?}", req.uri());
|
||||
|
||||
let mut is_library = false;
|
||||
if let Some(params) = req.uri().query() {
|
||||
info!("serving {:?} POST request with params: {}", route, params);
|
||||
if params == "is_library=true" {
|
||||
is_library = true;
|
||||
} else {
|
||||
let mut resp = Response::new(Body::from("Wrong request parameters"));
|
||||
*resp.status_mut() = StatusCode::BAD_REQUEST;
|
||||
return resp;
|
||||
}
|
||||
}
|
||||
|
||||
let filename = route.split('/').last().unwrap().to_string();
|
||||
info!("serving /extension_server POST request, filename: {filename:?} is_library: {is_library}");
|
||||
|
||||
match compute.download_extension(&filename, is_library).await {
|
||||
Ok(_) => Response::new(Body::from("OK")),
|
||||
Err(e) => {
|
||||
error!("extension download failed: {}", e);
|
||||
let mut resp = Response::new(Body::from(e.to_string()));
|
||||
*resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
|
||||
resp
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Return the `404 Not Found` for any other routes.
|
||||
_ => {
|
||||
let mut not_found = Response::new(Body::from("404 Not Found"));
|
||||
|
||||
@@ -139,6 +139,34 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/GenericError"
|
||||
/extension_server:
|
||||
post:
|
||||
tags:
|
||||
- Extension
|
||||
summary: Download extension from S3 to local folder.
|
||||
description: ""
|
||||
operationId: downloadExtension
|
||||
responses:
|
||||
200:
|
||||
description: Extension downloaded
|
||||
content:
|
||||
text/plain:
|
||||
schema:
|
||||
type: string
|
||||
description: Error text or 'OK' if download succeeded.
|
||||
example: "OK"
|
||||
400:
|
||||
description: Request is invalid.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/GenericError"
|
||||
500:
|
||||
description: Extension download request failed.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/GenericError"
|
||||
|
||||
components:
|
||||
securitySchemes:
|
||||
|
||||
@@ -9,6 +9,7 @@ pub mod http;
|
||||
#[macro_use]
|
||||
pub mod logger;
|
||||
pub mod compute;
|
||||
pub mod extension_server;
|
||||
pub mod monitor;
|
||||
pub mod params;
|
||||
pub mod pg_helpers;
|
||||
|
||||
@@ -124,7 +124,7 @@ pub fn get_spec_from_control_plane(
|
||||
pub fn handle_configuration(spec: &ComputeSpec, pgdata_path: &Path) -> Result<()> {
|
||||
// File `postgresql.conf` is no longer included into `basebackup`, so just
|
||||
// always write all config into it creating new file.
|
||||
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), spec)?;
|
||||
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), spec, None)?;
|
||||
|
||||
update_pg_hba(pgdata_path)?;
|
||||
|
||||
|
||||
@@ -32,3 +32,4 @@ utils.workspace = true
|
||||
|
||||
compute_api.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
tracing.workspace = true
|
||||
|
||||
@@ -658,6 +658,8 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
||||
.get_one::<String>("endpoint_id")
|
||||
.ok_or_else(|| anyhow!("No endpoint ID was provided to start"))?;
|
||||
|
||||
let remote_ext_config = sub_args.get_one::<String>("remote-ext-config");
|
||||
|
||||
// If --safekeepers argument is given, use only the listed safekeeper nodes.
|
||||
let safekeepers =
|
||||
if let Some(safekeepers_str) = sub_args.get_one::<String>("safekeepers") {
|
||||
@@ -699,7 +701,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
||||
_ => {}
|
||||
}
|
||||
println!("Starting existing endpoint {endpoint_id}...");
|
||||
endpoint.start(&auth_token, safekeepers)?;
|
||||
endpoint.start(&auth_token, safekeepers, remote_ext_config)?;
|
||||
} else {
|
||||
let branch_name = sub_args
|
||||
.get_one::<String>("branch-name")
|
||||
@@ -743,7 +745,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
||||
pg_version,
|
||||
mode,
|
||||
)?;
|
||||
ep.start(&auth_token, safekeepers)?;
|
||||
ep.start(&auth_token, safekeepers, remote_ext_config)?;
|
||||
}
|
||||
}
|
||||
"stop" => {
|
||||
@@ -1003,6 +1005,12 @@ fn cli() -> Command {
|
||||
.help("Additional pageserver's configuration options or overrides, refer to pageserver's 'config-override' CLI parameter docs for more")
|
||||
.required(false);
|
||||
|
||||
let remote_ext_config_args = Arg::new("remote-ext-config")
|
||||
.long("remote-ext-config")
|
||||
.num_args(1)
|
||||
.help("Configure the S3 bucket that we search for extensions in.")
|
||||
.required(false);
|
||||
|
||||
let lsn_arg = Arg::new("lsn")
|
||||
.long("lsn")
|
||||
.help("Specify Lsn on the timeline to start from. By default, end of the timeline would be used.")
|
||||
@@ -1161,6 +1169,7 @@ fn cli() -> Command {
|
||||
.arg(pg_version_arg)
|
||||
.arg(hot_standby_arg)
|
||||
.arg(safekeepers_arg)
|
||||
.arg(remote_ext_config_args)
|
||||
)
|
||||
.subcommand(
|
||||
Command::new("stop")
|
||||
|
||||
@@ -313,7 +313,7 @@ impl Endpoint {
|
||||
|
||||
// TODO: use future host field from safekeeper spec
|
||||
// Pass the list of safekeepers to the replica so that it can connect to any of them,
|
||||
// whichever is availiable.
|
||||
// whichever is available.
|
||||
let sk_ports = self
|
||||
.env
|
||||
.safekeepers
|
||||
@@ -420,7 +420,12 @@ impl Endpoint {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn start(&self, auth_token: &Option<String>, safekeepers: Vec<NodeId>) -> Result<()> {
|
||||
pub fn start(
|
||||
&self,
|
||||
auth_token: &Option<String>,
|
||||
safekeepers: Vec<NodeId>,
|
||||
remote_ext_config: Option<&String>,
|
||||
) -> Result<()> {
|
||||
if self.status() == "running" {
|
||||
anyhow::bail!("The endpoint is already running");
|
||||
}
|
||||
@@ -488,6 +493,13 @@ impl Endpoint {
|
||||
pageserver_connstring: Some(pageserver_connstring),
|
||||
safekeeper_connstrings,
|
||||
storage_auth_token: auth_token.clone(),
|
||||
// TODO FIXME: This is a hack to test custom extensions locally.
|
||||
// In test_download_extensions, we assume that the custom extension
|
||||
// prefix is the tenant ID. So we set it here.
|
||||
//
|
||||
// The proper way to implement this is to pass the custom extension
|
||||
// in spec, but we don't have a way to do that yet in the python tests.
|
||||
custom_extensions: Some(vec!["kq_imcx".into()]),
|
||||
};
|
||||
let spec_path = self.endpoint_path().join("spec.json");
|
||||
std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;
|
||||
@@ -519,6 +531,11 @@ impl Endpoint {
|
||||
.stdin(std::process::Stdio::null())
|
||||
.stderr(logfile.try_clone()?)
|
||||
.stdout(logfile);
|
||||
|
||||
if let Some(remote_ext_config) = remote_ext_config {
|
||||
cmd.args(["--remote-ext-config", remote_ext_config]);
|
||||
}
|
||||
|
||||
let child = cmd.spawn()?;
|
||||
|
||||
// Write down the pid so we can wait for it when we want to stop
|
||||
|
||||
218
docs/rfcs/024-extension-loading.md
Normal file
218
docs/rfcs/024-extension-loading.md
Normal file
@@ -0,0 +1,218 @@
|
||||
# Supporting custom user Extensions (Dynamic Extension Loading)
|
||||
Created 2023-05-03
|
||||
|
||||
## Motivation
|
||||
|
||||
There are many extensions in the PostgreSQL ecosystem, and not all extensions
|
||||
are of a quality that we can confidently support them. Additionally, our
|
||||
current extension inclusion mechanism has several problems because we build all
|
||||
extensions into the primary Compute image: We build the extensions every time
|
||||
we build the compute image regardless of whether we actually need to rebuild
|
||||
the image, and the inclusion of these extensions in the image adds a hard
|
||||
dependency on all supported extensions - thus increasing the image size, and
|
||||
with it the time it takes to download that image - increasing first start
|
||||
latency.
|
||||
|
||||
This RFC proposes a dynamic loading mechanism that solves most of these
|
||||
problems.
|
||||
|
||||
## Summary
|
||||
|
||||
`compute_ctl` is made responsible for loading extensions on-demand into
|
||||
the container's file system for dynamically loaded extensions, and will also
|
||||
make sure that the extensions in `shared_preload_libraries` are downloaded
|
||||
before the compute node starts.
|
||||
|
||||
## Components
|
||||
|
||||
compute_ctl, PostgreSQL, neon (extension), Compute Host Node, Extension Store
|
||||
|
||||
## Requirements
|
||||
|
||||
Compute nodes with no extra extensions should not be negatively impacted by
|
||||
the existence of support for many extensions.
|
||||
|
||||
Installing an extension into PostgreSQL should be easy.
|
||||
|
||||
Non-preloaded extensions shouldn't impact startup latency.
|
||||
|
||||
Uninstalled extensions shouldn't impact query latency.
|
||||
|
||||
A small latency penalty for dynamically loaded extensions is acceptable in
|
||||
the first seconds of compute startup, but not in steady-state operations.
|
||||
|
||||
## Proposed implementation
|
||||
|
||||
### On-demand, JIT-loading of extensions
|
||||
|
||||
Before postgres starts we download
|
||||
- control files for all extensions available to that compute node;
|
||||
- all `shared_preload_libraries`;
|
||||
|
||||
After postgres is running, `compute_ctl` listens for requests to load files.
|
||||
When PostgreSQL requests a file, `compute_ctl` downloads it.
|
||||
|
||||
PostgreSQL requests files in the following cases:
|
||||
- When loading a preload library set in `local_preload_libraries`
|
||||
- When explicitly loading a library with `LOAD`
|
||||
- Wnen creating extension with `CREATE EXTENSION` (download sql scripts, (optional) extension data files and (optional) library files)))
|
||||
|
||||
|
||||
#### Summary
|
||||
|
||||
Pros:
|
||||
- Startup is only as slow as it takes to load all (shared_)preload_libraries
|
||||
- Supports BYO Extension
|
||||
|
||||
Cons:
|
||||
- O(sizeof(extensions)) IO requirement for loading all extensions.
|
||||
|
||||
### Alternative solutions
|
||||
|
||||
1. Allow users to add their extensions to the base image
|
||||
|
||||
Pros:
|
||||
- Easy to deploy
|
||||
|
||||
Cons:
|
||||
- Doesn't scale - first start size is dependent on image size;
|
||||
- All extensions are shared across all users: It doesn't allow users to
|
||||
bring their own restrictive-licensed extensions
|
||||
|
||||
2. Bring Your Own compute image
|
||||
|
||||
Pros:
|
||||
- Still easy to deploy
|
||||
- User can bring own patched version of PostgreSQL
|
||||
|
||||
Cons:
|
||||
- First start latency is O(sizeof(extensions image))
|
||||
- Warm instance pool for skipping pod schedule latency is not feasible with
|
||||
O(n) custom images
|
||||
- Support channels are difficult to manage
|
||||
|
||||
3. Download all user extensions in bulk on compute start
|
||||
|
||||
Pros:
|
||||
- Easy to deploy
|
||||
- No startup latency issues for "clean" users.
|
||||
- Warm instance pool for skipping pod schedule latency is possible
|
||||
|
||||
Cons:
|
||||
- Downloading all extensions in advance takes a lot of time, thus startup
|
||||
latency issues
|
||||
|
||||
4. Store user's extensions in persistent storage
|
||||
|
||||
Pros:
|
||||
- Easy to deploy
|
||||
- No startup latency issues
|
||||
- Warm instance pool for skipping pod schedule latency is possible
|
||||
|
||||
Cons:
|
||||
- EC2 instances have only limited number of attachments shared between EBS
|
||||
volumes, direct-attached NVMe drives, and ENIs.
|
||||
- Compute instance migration isn't trivially solved for EBS mounts (e.g.
|
||||
the device is unavailable whilst moving the mount between instances).
|
||||
- EBS can only mount on one instance at a time (except the expensive IO2
|
||||
device type).
|
||||
|
||||
5. Store user's extensions in network drive
|
||||
|
||||
Pros:
|
||||
- Easy to deploy
|
||||
- Few startup latency issues
|
||||
- Warm instance pool for skipping pod schedule latency is possible
|
||||
|
||||
Cons:
|
||||
- We'd need networked drives, and a lot of them, which would store many
|
||||
duplicate extensions.
|
||||
- **UNCHECKED:** Compute instance migration may not work nicely with
|
||||
networked IOs
|
||||
|
||||
|
||||
### Idea extensions
|
||||
|
||||
The extension store does not have to be S3 directly, but could be a Node-local
|
||||
caching service on top of S3. This would reduce the load on the network for
|
||||
popular extensions.
|
||||
|
||||
## Extension Storage implementation
|
||||
|
||||
The layout of the S3 bucket is as follows:
|
||||
```
|
||||
5615610098 // this is an extension build number
|
||||
├── v14
|
||||
│ ├── extensions
|
||||
│ │ ├── anon.tar.zst
|
||||
│ │ └── embedding.tar.zst
|
||||
│ └── ext_index.json
|
||||
└── v15
|
||||
├── extensions
|
||||
│ ├── anon.tar.zst
|
||||
│ └── embedding.tar.zst
|
||||
└── ext_index.json
|
||||
5615261079
|
||||
├── v14
|
||||
│ ├── extensions
|
||||
│ │ └── anon.tar.zst
|
||||
│ └── ext_index.json
|
||||
└── v15
|
||||
├── extensions
|
||||
│ └── anon.tar.zst
|
||||
└── ext_index.json
|
||||
5623261088
|
||||
├── v14
|
||||
│ ├── extensions
|
||||
│ │ └── embedding.tar.zst
|
||||
│ └── ext_index.json
|
||||
└── v15
|
||||
├── extensions
|
||||
│ └── embedding.tar.zst
|
||||
└── ext_index.json
|
||||
```
|
||||
|
||||
Note that build number cannot be part of prefix because we might need extensions
|
||||
from other build numbers.
|
||||
|
||||
ext_index.json stores the control files and location of extension archives
|
||||
|
||||
We do not duplicate extension.tar.zst files.
|
||||
We only upload a new one if it is updated.
|
||||
*access* is controlled by spec
|
||||
|
||||
More specifically, here is an example ext_index.json
|
||||
```
|
||||
{
|
||||
"embedding": {
|
||||
"control_data": {
|
||||
"embedding.control": "comment = 'hnsw index' \ndefault_version = '0.1.0' \nmodule_pathname = '$libdir/embedding' \nrelocatable = true \ntrusted = true"
|
||||
},
|
||||
"archive_path": "5623261088/v15/extensions/embedding.tar.zst"
|
||||
},
|
||||
"anon": {
|
||||
"control_data": {
|
||||
"anon.control": "# PostgreSQL Anonymizer (anon) extension \ncomment = 'Data anonymization tools' \ndefault_version = '1.1.0' \ndirectory='extension/anon' \nrelocatable = false \nrequires = 'pgcrypto' \nsuperuser = false \nmodule_pathname = '$libdir/anon' \ntrusted = true \n"
|
||||
},
|
||||
"archive_path": "5615261079/v15/extensions/anon.tar.zst"
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### How to add new extension to the Extension Storage?
|
||||
|
||||
Simply upload build artifacts to the S3 bucket.
|
||||
Implement a CI step for that. Splitting it from compute-node-image build.
|
||||
|
||||
### How do we deal with extension versions and updates?
|
||||
|
||||
Currently, we rebuild extensions on every compute-node-image build and store them in the <build-version> prefix.
|
||||
This is needed to ensure that `/share` and `/lib` files are in sync.
|
||||
|
||||
For extension updates, we rely on the PostgreSQL extension versioning mechanism (sql update scripts) and extension authors to not break backwards compatibility within one major version of PostgreSQL.
|
||||
|
||||
### Alternatives
|
||||
|
||||
For extensions written on trusted languages we can also adopt
|
||||
`dbdev` PostgreSQL Package Manager based on `pg_tle` by Supabase.
|
||||
This will increase the amount supported extensions and decrease the amount of work required to support them.
|
||||
@@ -75,6 +75,7 @@ pub struct ComputeMetrics {
|
||||
pub start_postgres_ms: u64,
|
||||
pub config_ms: u64,
|
||||
pub total_startup_ms: u64,
|
||||
pub load_libraries_ms: u64,
|
||||
}
|
||||
|
||||
/// Response of the `/computes/{compute_id}/spec` control-plane API.
|
||||
|
||||
@@ -60,6 +60,9 @@ pub struct ComputeSpec {
|
||||
/// If set, 'storage_auth_token' is used as the password to authenticate to
|
||||
/// the pageserver and safekeepers.
|
||||
pub storage_auth_token: Option<String>,
|
||||
|
||||
// list of prefixes to search for custom extensions in remote extension storage
|
||||
pub custom_extensions: Option<Vec<String>>,
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
|
||||
@@ -24,6 +24,7 @@ use tokio::io;
|
||||
use toml_edit::Item;
|
||||
use tracing::info;
|
||||
|
||||
pub use self::s3_bucket::better_download;
|
||||
pub use self::{local_fs::LocalFs, s3_bucket::S3Bucket, simulate_failures::UnreliableWrapper};
|
||||
|
||||
/// How many different timelines can be processed simultaneously when synchronizing layers with the remote storage.
|
||||
@@ -65,6 +66,10 @@ impl RemotePath {
|
||||
Ok(Self(relative_path.to_path_buf()))
|
||||
}
|
||||
|
||||
pub fn from_string(relative_path: &str) -> anyhow::Result<Self> {
|
||||
Self::new(Path::new(relative_path))
|
||||
}
|
||||
|
||||
pub fn with_base(&self, base_path: &Path) -> PathBuf {
|
||||
base_path.join(&self.0)
|
||||
}
|
||||
@@ -190,6 +195,20 @@ pub enum GenericRemoteStorage {
|
||||
}
|
||||
|
||||
impl GenericRemoteStorage {
|
||||
// A function for listing all the files in a "directory"
|
||||
// Example:
|
||||
// list_files("foo/bar") = ["foo/bar/a.txt", "foo/bar/b.txt"]
|
||||
pub async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
|
||||
match self {
|
||||
Self::LocalFs(s) => s.list_files(folder).await,
|
||||
Self::AwsS3(s) => s.list_files(folder).await,
|
||||
Self::Unreliable(s) => s.list_files(folder).await,
|
||||
}
|
||||
}
|
||||
|
||||
// lists common *prefixes*, if any of files
|
||||
// Example:
|
||||
// list_prefixes("foo123","foo567","bar123","bar432") = ["foo", "bar"]
|
||||
pub async fn list_prefixes(
|
||||
&self,
|
||||
prefix: Option<&RemotePath>,
|
||||
@@ -201,14 +220,6 @@ impl GenericRemoteStorage {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
|
||||
match self {
|
||||
Self::LocalFs(s) => s.list_files(folder).await,
|
||||
Self::AwsS3(s) => s.list_files(folder).await,
|
||||
Self::Unreliable(s) => s.list_files(folder).await,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn upload(
|
||||
&self,
|
||||
from: impl io::AsyncRead + Unpin + Send + Sync + 'static,
|
||||
|
||||
@@ -31,7 +31,8 @@ use tracing::debug;
|
||||
|
||||
use super::StorageMetadata;
|
||||
use crate::{
|
||||
Download, DownloadError, RemotePath, RemoteStorage, S3Config, REMOTE_STORAGE_PREFIX_SEPARATOR,
|
||||
Download, DownloadError, GenericRemoteStorage, RemotePath, RemoteStorage, S3Config,
|
||||
REMOTE_STORAGE_PREFIX_SEPARATOR,
|
||||
};
|
||||
|
||||
const MAX_DELETE_OBJECTS_REQUEST_SIZE: usize = 1000;
|
||||
@@ -131,6 +132,39 @@ struct GetObjectRequest {
|
||||
key: String,
|
||||
range: Option<String>,
|
||||
}
|
||||
|
||||
use crate::GenericRemoteStorage::AwsS3;
|
||||
// the regular download function adds a "/" to the start of file names in the
|
||||
// case of prefix="None", which breaks everything. Thus, the following function is necessary
|
||||
pub async fn better_download(
|
||||
bucket: &GenericRemoteStorage,
|
||||
from: &RemotePath,
|
||||
) -> Result<Download, DownloadError> {
|
||||
if let AwsS3(bucket) = bucket {
|
||||
// this is more expected behavior.
|
||||
// prefix="" should result in a trailing slash
|
||||
// wheras prefix=None should **NOT** result in a trailing slash
|
||||
let query_key = match &bucket.prefix_in_bucket {
|
||||
Some(_) => bucket.relative_path_to_s3_object(from),
|
||||
None => from
|
||||
.get_path()
|
||||
.to_str()
|
||||
.expect("bad object name")
|
||||
.to_string(),
|
||||
};
|
||||
|
||||
bucket
|
||||
.download_object(GetObjectRequest {
|
||||
bucket: bucket.bucket_name.clone(),
|
||||
key: query_key,
|
||||
range: None,
|
||||
})
|
||||
.await
|
||||
} else {
|
||||
panic!("this isn't supposed to happen");
|
||||
}
|
||||
}
|
||||
|
||||
impl S3Bucket {
|
||||
/// Creates the S3 storage, errors if incorrect AWS S3 configuration provided.
|
||||
pub fn new(aws_config: &S3Config) -> anyhow::Result<Self> {
|
||||
@@ -349,10 +383,17 @@ impl RemoteStorage for S3Bucket {
|
||||
|
||||
/// See the doc for `RemoteStorage::list_files`
|
||||
async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
|
||||
let folder_name = folder
|
||||
let mut folder_name = folder
|
||||
.map(|p| self.relative_path_to_s3_object(p))
|
||||
.or_else(|| self.prefix_in_bucket.clone());
|
||||
|
||||
// remove leading "/" if one exists
|
||||
if let Some(folder_name_slash) = folder_name.clone() {
|
||||
if folder_name_slash.starts_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
|
||||
folder_name = Some(folder_name_slash[1..].to_string());
|
||||
}
|
||||
}
|
||||
|
||||
// AWS may need to break the response into several parts
|
||||
let mut continuation_token = None;
|
||||
let mut all_files = vec![];
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
MODULE_big = neon
|
||||
OBJS = \
|
||||
$(WIN32RES) \
|
||||
extension_server.o \
|
||||
file_cache.o \
|
||||
libpagestore.o \
|
||||
libpqwalproposer.o \
|
||||
|
||||
103
pgxn/neon/extension_server.c
Normal file
103
pgxn/neon/extension_server.c
Normal file
@@ -0,0 +1,103 @@
|
||||
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* extension_server.c
|
||||
* Request compute_ctl to download extension files.
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* contrib/neon/extension_server.c
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#include "postgres.h"
|
||||
#include "tcop/pquery.h"
|
||||
#include "tcop/utility.h"
|
||||
#include "access/xact.h"
|
||||
#include "utils/hsearch.h"
|
||||
#include "utils/memutils.h"
|
||||
#include "commands/defrem.h"
|
||||
#include "miscadmin.h"
|
||||
#include "utils/acl.h"
|
||||
#include "fmgr.h"
|
||||
#include "utils/guc.h"
|
||||
#include "port.h"
|
||||
#include "fmgr.h"
|
||||
|
||||
#include <curl/curl.h>
|
||||
|
||||
static int extension_server_port = 0;
|
||||
|
||||
static download_extension_file_hook_type prev_download_extension_file_hook = NULL;
|
||||
|
||||
// to download all SQL (and data) files for an extension:
|
||||
// curl -X POST http://localhost:8080/extension_server/postgis
|
||||
// it covers two possible extension files layouts:
|
||||
// 1. extension_name--version--platform.sql
|
||||
// 2. extension_name/extension_name--version.sql
|
||||
// extension_name/extra_files.csv
|
||||
//
|
||||
// to download specific library file:
|
||||
// curl -X POST http://localhost:8080/extension_server/postgis-3.so?is_library=true
|
||||
static bool
|
||||
neon_download_extension_file_http(const char *filename, bool is_library)
|
||||
{
|
||||
CURL *curl;
|
||||
CURLcode res;
|
||||
char *compute_ctl_url;
|
||||
char *postdata;
|
||||
bool ret = false;
|
||||
|
||||
if ((curl = curl_easy_init()) == NULL)
|
||||
{
|
||||
elog(ERROR, "Failed to initialize curl handle");
|
||||
}
|
||||
|
||||
compute_ctl_url = psprintf("http://localhost:%d/extension_server/%s%s",
|
||||
extension_server_port, filename, is_library ? "?is_library=true" : "");
|
||||
|
||||
elog(LOG, "Sending request to compute_ctl: %s", compute_ctl_url);
|
||||
|
||||
curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "POST");
|
||||
curl_easy_setopt(curl, CURLOPT_URL, compute_ctl_url);
|
||||
curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3L /* seconds */);
|
||||
|
||||
if (curl)
|
||||
{
|
||||
/* Perform the request, res will get the return code */
|
||||
res = curl_easy_perform(curl);
|
||||
/* Check for errors */
|
||||
if (res == CURLE_OK)
|
||||
{
|
||||
ret = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Don't error here because postgres will try to find the file
|
||||
// and will fail with some proper error message if it's not found.
|
||||
elog(WARNING, "neon_download_extension_file_http failed: %s\n", curl_easy_strerror(res));
|
||||
}
|
||||
|
||||
/* always cleanup */
|
||||
curl_easy_cleanup(curl);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
void pg_init_extension_server()
|
||||
{
|
||||
// Port to connect to compute_ctl on localhost
|
||||
// to request extension files.
|
||||
DefineCustomIntVariable("neon.extension_server_port",
|
||||
"connection string to the compute_ctl",
|
||||
NULL,
|
||||
&extension_server_port,
|
||||
0, 0, INT_MAX,
|
||||
PGC_POSTMASTER,
|
||||
0, /* no flags required */
|
||||
NULL, NULL, NULL);
|
||||
|
||||
// set download_extension_file_hook
|
||||
prev_download_extension_file_hook = download_extension_file_hook;
|
||||
download_extension_file_hook = neon_download_extension_file_http;
|
||||
}
|
||||
@@ -35,8 +35,11 @@ _PG_init(void)
|
||||
{
|
||||
pg_init_libpagestore();
|
||||
pg_init_walproposer();
|
||||
|
||||
InitControlPlaneConnector();
|
||||
|
||||
pg_init_extension_server();
|
||||
|
||||
// Important: This must happen after other parts of the extension
|
||||
// are loaded, otherwise any settings to GUCs that were set before
|
||||
// the extension was loaded will be removed.
|
||||
|
||||
@@ -21,6 +21,8 @@ extern char *neon_tenant;
|
||||
extern void pg_init_libpagestore(void);
|
||||
extern void pg_init_walproposer(void);
|
||||
|
||||
extern void pg_init_extension_server(void);
|
||||
|
||||
/*
|
||||
* Returns true if we shouldn't do REDO on that block in record indicated by
|
||||
* block_id; false otherwise.
|
||||
|
||||
@@ -530,6 +530,16 @@ def available_remote_storages() -> List[RemoteStorageKind]:
|
||||
return remote_storages
|
||||
|
||||
|
||||
def available_s3_storages() -> List[RemoteStorageKind]:
|
||||
remote_storages = [RemoteStorageKind.MOCK_S3]
|
||||
if os.getenv("ENABLE_REAL_S3_REMOTE_STORAGE") is not None:
|
||||
remote_storages.append(RemoteStorageKind.REAL_S3)
|
||||
log.info("Enabling real s3 storage for tests")
|
||||
else:
|
||||
log.info("Using mock implementations to test remote storage")
|
||||
return remote_storages
|
||||
|
||||
|
||||
@dataclass
|
||||
class LocalFsStorage:
|
||||
root: Path
|
||||
@@ -550,6 +560,16 @@ class S3Storage:
|
||||
"AWS_SECRET_ACCESS_KEY": self.secret_key,
|
||||
}
|
||||
|
||||
def to_string(self) -> str:
|
||||
return json.dumps(
|
||||
{
|
||||
"bucket": self.bucket_name,
|
||||
"region": self.bucket_region,
|
||||
"endpoint": self.endpoint,
|
||||
"prefix": self.prefix_in_bucket,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
RemoteStorage = Union[LocalFsStorage, S3Storage]
|
||||
|
||||
@@ -616,10 +636,12 @@ class NeonEnvBuilder:
|
||||
self.rust_log_override = rust_log_override
|
||||
self.port_distributor = port_distributor
|
||||
self.remote_storage = remote_storage
|
||||
self.ext_remote_storage: Optional[S3Storage] = None
|
||||
self.remote_storage_client: Optional[Any] = None
|
||||
self.remote_storage_users = remote_storage_users
|
||||
self.broker = broker
|
||||
self.run_id = run_id
|
||||
self.mock_s3_server = mock_s3_server
|
||||
self.mock_s3_server: MockS3Server = mock_s3_server
|
||||
self.pageserver_config_override = pageserver_config_override
|
||||
self.num_safekeepers = num_safekeepers
|
||||
self.safekeepers_id_start = safekeepers_id_start
|
||||
@@ -667,15 +689,24 @@ class NeonEnvBuilder:
|
||||
remote_storage_kind: RemoteStorageKind,
|
||||
test_name: str,
|
||||
force_enable: bool = True,
|
||||
enable_remote_extensions: bool = False,
|
||||
):
|
||||
if remote_storage_kind == RemoteStorageKind.NOOP:
|
||||
return
|
||||
elif remote_storage_kind == RemoteStorageKind.LOCAL_FS:
|
||||
self.enable_local_fs_remote_storage(force_enable=force_enable)
|
||||
elif remote_storage_kind == RemoteStorageKind.MOCK_S3:
|
||||
self.enable_mock_s3_remote_storage(bucket_name=test_name, force_enable=force_enable)
|
||||
self.enable_mock_s3_remote_storage(
|
||||
bucket_name=test_name,
|
||||
force_enable=force_enable,
|
||||
enable_remote_extensions=enable_remote_extensions,
|
||||
)
|
||||
elif remote_storage_kind == RemoteStorageKind.REAL_S3:
|
||||
self.enable_real_s3_remote_storage(test_name=test_name, force_enable=force_enable)
|
||||
self.enable_real_s3_remote_storage(
|
||||
test_name=test_name,
|
||||
force_enable=force_enable,
|
||||
enable_remote_extensions=enable_remote_extensions,
|
||||
)
|
||||
else:
|
||||
raise RuntimeError(f"Unknown storage type: {remote_storage_kind}")
|
||||
|
||||
@@ -689,11 +720,18 @@ class NeonEnvBuilder:
|
||||
assert force_enable or self.remote_storage is None, "remote storage is enabled already"
|
||||
self.remote_storage = LocalFsStorage(Path(self.repo_dir / "local_fs_remote_storage"))
|
||||
|
||||
def enable_mock_s3_remote_storage(self, bucket_name: str, force_enable: bool = True):
|
||||
def enable_mock_s3_remote_storage(
|
||||
self,
|
||||
bucket_name: str,
|
||||
force_enable: bool = True,
|
||||
enable_remote_extensions: bool = False,
|
||||
):
|
||||
"""
|
||||
Sets up the pageserver to use the S3 mock server, creates the bucket, if it's not present already.
|
||||
Starts up the mock server, if that does not run yet.
|
||||
Errors, if the pageserver has some remote storage configuration already, unless `force_enable` is not set to `True`.
|
||||
|
||||
Also creates the bucket for extensions, self.ext_remote_storage bucket
|
||||
"""
|
||||
assert force_enable or self.remote_storage is None, "remote storage is enabled already"
|
||||
mock_endpoint = self.mock_s3_server.endpoint()
|
||||
@@ -714,9 +752,25 @@ class NeonEnvBuilder:
|
||||
bucket_region=mock_region,
|
||||
access_key=self.mock_s3_server.access_key(),
|
||||
secret_key=self.mock_s3_server.secret_key(),
|
||||
prefix_in_bucket="pageserver",
|
||||
)
|
||||
|
||||
def enable_real_s3_remote_storage(self, test_name: str, force_enable: bool = True):
|
||||
if enable_remote_extensions:
|
||||
self.ext_remote_storage = S3Storage(
|
||||
bucket_name=bucket_name,
|
||||
endpoint=mock_endpoint,
|
||||
bucket_region=mock_region,
|
||||
access_key=self.mock_s3_server.access_key(),
|
||||
secret_key=self.mock_s3_server.secret_key(),
|
||||
prefix_in_bucket="ext",
|
||||
)
|
||||
|
||||
def enable_real_s3_remote_storage(
|
||||
self,
|
||||
test_name: str,
|
||||
force_enable: bool = True,
|
||||
enable_remote_extensions: bool = False,
|
||||
):
|
||||
"""
|
||||
Sets up configuration to use real s3 endpoint without mock server
|
||||
"""
|
||||
@@ -756,6 +810,15 @@ class NeonEnvBuilder:
|
||||
prefix_in_bucket=self.remote_storage_prefix,
|
||||
)
|
||||
|
||||
if enable_remote_extensions:
|
||||
self.ext_remote_storage = S3Storage(
|
||||
bucket_name="neon-dev-extensions-eu-central-1",
|
||||
bucket_region="eu-central-1",
|
||||
access_key=access_key,
|
||||
secret_key=secret_key,
|
||||
prefix_in_bucket=None,
|
||||
)
|
||||
|
||||
def cleanup_local_storage(self):
|
||||
if self.preserve_database_files:
|
||||
return
|
||||
@@ -789,6 +852,7 @@ class NeonEnvBuilder:
|
||||
# `self.remote_storage_prefix` is coupled with `S3Storage` storage type,
|
||||
# so this line effectively a no-op
|
||||
assert isinstance(self.remote_storage, S3Storage)
|
||||
assert self.remote_storage_client is not None
|
||||
|
||||
if self.keep_remote_storage_contents:
|
||||
log.info("keep_remote_storage_contents skipping remote storage cleanup")
|
||||
@@ -918,6 +982,8 @@ class NeonEnv:
|
||||
self.neon_binpath = config.neon_binpath
|
||||
self.pg_distrib_dir = config.pg_distrib_dir
|
||||
self.endpoint_counter = 0
|
||||
self.remote_storage_client = config.remote_storage_client
|
||||
self.ext_remote_storage = config.ext_remote_storage
|
||||
|
||||
# generate initial tenant ID here instead of letting 'neon init' generate it,
|
||||
# so that we don't need to dig it out of the config file afterwards.
|
||||
@@ -1504,6 +1570,7 @@ class NeonCli(AbstractNeonCli):
|
||||
safekeepers: Optional[List[int]] = None,
|
||||
tenant_id: Optional[TenantId] = None,
|
||||
lsn: Optional[Lsn] = None,
|
||||
remote_ext_config: Optional[str] = None,
|
||||
) -> "subprocess.CompletedProcess[str]":
|
||||
args = [
|
||||
"endpoint",
|
||||
@@ -1513,6 +1580,8 @@ class NeonCli(AbstractNeonCli):
|
||||
"--pg-version",
|
||||
self.env.pg_version,
|
||||
]
|
||||
if remote_ext_config is not None:
|
||||
args.extend(["--remote-ext-config", remote_ext_config])
|
||||
if lsn is not None:
|
||||
args.append(f"--lsn={lsn}")
|
||||
args.extend(["--pg-port", str(pg_port)])
|
||||
@@ -2371,7 +2440,7 @@ class Endpoint(PgProtocol):
|
||||
|
||||
return self
|
||||
|
||||
def start(self) -> "Endpoint":
|
||||
def start(self, remote_ext_config: Optional[str] = None) -> "Endpoint":
|
||||
"""
|
||||
Start the Postgres instance.
|
||||
Returns self.
|
||||
@@ -2387,6 +2456,7 @@ class Endpoint(PgProtocol):
|
||||
http_port=self.http_port,
|
||||
tenant_id=self.tenant_id,
|
||||
safekeepers=self.active_safekeepers,
|
||||
remote_ext_config=remote_ext_config,
|
||||
)
|
||||
self.running = True
|
||||
|
||||
@@ -2476,6 +2546,7 @@ class Endpoint(PgProtocol):
|
||||
hot_standby: bool = False,
|
||||
lsn: Optional[Lsn] = None,
|
||||
config_lines: Optional[List[str]] = None,
|
||||
remote_ext_config: Optional[str] = None,
|
||||
) -> "Endpoint":
|
||||
"""
|
||||
Create an endpoint, apply config, and start Postgres.
|
||||
@@ -2490,7 +2561,7 @@ class Endpoint(PgProtocol):
|
||||
config_lines=config_lines,
|
||||
hot_standby=hot_standby,
|
||||
lsn=lsn,
|
||||
).start()
|
||||
).start(remote_ext_config=remote_ext_config)
|
||||
|
||||
log.info(f"Postgres startup took {time.time() - started_at} seconds")
|
||||
|
||||
@@ -2524,6 +2595,7 @@ class EndpointFactory:
|
||||
lsn: Optional[Lsn] = None,
|
||||
hot_standby: bool = False,
|
||||
config_lines: Optional[List[str]] = None,
|
||||
remote_ext_config: Optional[str] = None,
|
||||
) -> Endpoint:
|
||||
ep = Endpoint(
|
||||
self.env,
|
||||
@@ -2540,6 +2612,7 @@ class EndpointFactory:
|
||||
hot_standby=hot_standby,
|
||||
config_lines=config_lines,
|
||||
lsn=lsn,
|
||||
remote_ext_config=remote_ext_config,
|
||||
)
|
||||
|
||||
def create(
|
||||
|
||||
@@ -89,6 +89,9 @@ class TenantId(Id):
|
||||
def __repr__(self) -> str:
|
||||
return f'`TenantId("{self.id.hex()}")'
|
||||
|
||||
def __str__(self) -> str:
|
||||
return self.id.hex()
|
||||
|
||||
|
||||
class TimelineId(Id):
|
||||
def __repr__(self) -> str:
|
||||
|
||||
@@ -0,0 +1,24 @@
|
||||
{
|
||||
"public_extensions": [
|
||||
"anon"
|
||||
],
|
||||
"library_index": {
|
||||
"anon": "anon",
|
||||
"kq_imcx": "kq_imcx"
|
||||
},
|
||||
"extension_data": {
|
||||
"kq_imcx": {
|
||||
"control_data": {
|
||||
"kq_imcx.control": "# This file is generated content from add_postgresql_extension.\n# No point in modifying it, it will be overwritten anyway.\n\n# Default version, always set\ndefault_version = '0.1'\n\n# Module pathname generated from target shared library name. Use\n# MODULE_PATHNAME in script file.\nmodule_pathname = '$libdir/kq_imcx.so'\n\n# Comment for extension. Set using COMMENT option. Can be set in\n# script file as well.\ncomment = 'ketteQ In-Memory Calendar Extension (IMCX)'\n\n# Encoding for script file. Set using ENCODING option.\n#encoding = ''\n\n# Required extensions. Set using REQUIRES option (multi-valued).\n#requires = ''\ntrusted = true\n"
|
||||
},
|
||||
"archive_path": "5670669815/v14/extensions/kq_imcx.tar.zst"
|
||||
},
|
||||
"anon": {
|
||||
"control_data": {
|
||||
"anon.control": "# PostgreSQL Anonymizer (anon) extension \ncomment = 'Data anonymization tools' \ndefault_version = '1.1.0' \ndirectory='extension/anon' \nrelocatable = false \nrequires = 'pgcrypto' \nsuperuser = false \nmodule_pathname = '$libdir/anon' \ntrusted = true \n"
|
||||
},
|
||||
"archive_path": "5670669815/v14/extensions/anon.tar.zst"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,24 @@
|
||||
{
|
||||
"public_extensions": [
|
||||
"anon"
|
||||
],
|
||||
"library_index": {
|
||||
"anon": "anon",
|
||||
"kq_imcx": "kq_imcx"
|
||||
},
|
||||
"extension_data": {
|
||||
"kq_imcx": {
|
||||
"control_data": {
|
||||
"kq_imcx.control": "# This file is generated content from add_postgresql_extension.\n# No point in modifying it, it will be overwritten anyway.\n\n# Default version, always set\ndefault_version = '0.1'\n\n# Module pathname generated from target shared library name. Use\n# MODULE_PATHNAME in script file.\nmodule_pathname = '$libdir/kq_imcx.so'\n\n# Comment for extension. Set using COMMENT option. Can be set in\n# script file as well.\ncomment = 'ketteQ In-Memory Calendar Extension (IMCX)'\n\n# Encoding for script file. Set using ENCODING option.\n#encoding = ''\n\n# Required extensions. Set using REQUIRES option (multi-valued).\n#requires = ''\ntrusted = true\n"
|
||||
},
|
||||
"archive_path": "5670669815/v15/extensions/kq_imcx.tar.zst"
|
||||
},
|
||||
"anon": {
|
||||
"control_data": {
|
||||
"anon.control": "# PostgreSQL Anonymizer (anon) extension \ncomment = 'Data anonymization tools' \ndefault_version = '1.1.0' \ndirectory='extension/anon' \nrelocatable = false \nrequires = 'pgcrypto' \nsuperuser = false \nmodule_pathname = '$libdir/anon' \ntrusted = true \n"
|
||||
},
|
||||
"archive_path": "5670669815/v15/extensions/anon.tar.zst"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Binary file not shown.
Binary file not shown.
259
test_runner/regress/test_download_extensions.py
Normal file
259
test_runner/regress/test_download_extensions.py
Normal file
@@ -0,0 +1,259 @@
|
||||
import os
|
||||
import shutil
|
||||
from contextlib import closing
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
RemoteStorageKind,
|
||||
available_s3_storages,
|
||||
)
|
||||
from fixtures.pg_version import PgVersion
|
||||
|
||||
|
||||
# Cleaning up downloaded files is important for local tests
|
||||
# or else one test could reuse the files from another test or another test run
|
||||
def cleanup(pg_version):
|
||||
PGDIR = Path(f"pg_install/v{pg_version}")
|
||||
|
||||
LIB_DIR = PGDIR / Path("lib/postgresql")
|
||||
cleanup_lib_globs = ["anon*", "postgis*"]
|
||||
cleanup_lib_glob_paths = [LIB_DIR.glob(x) for x in cleanup_lib_globs]
|
||||
|
||||
SHARE_DIR = PGDIR / Path("share/postgresql/extension")
|
||||
cleanup_ext_globs = [
|
||||
"anon*",
|
||||
"address_standardizer*",
|
||||
"postgis*",
|
||||
"pageinspect*",
|
||||
"pg_buffercache*",
|
||||
"pgrouting*",
|
||||
]
|
||||
cleanup_ext_glob_paths = [SHARE_DIR.glob(x) for x in cleanup_ext_globs]
|
||||
|
||||
all_glob_paths = cleanup_lib_glob_paths + cleanup_ext_glob_paths
|
||||
all_cleanup_files = []
|
||||
for file_glob in all_glob_paths:
|
||||
for file in file_glob:
|
||||
all_cleanup_files.append(file)
|
||||
|
||||
for file in all_cleanup_files:
|
||||
try:
|
||||
os.remove(file)
|
||||
log.info(f"removed file {file}")
|
||||
except Exception as err:
|
||||
log.info(f"error removing file {file}: {err}")
|
||||
|
||||
cleanup_folders = [SHARE_DIR / Path("anon"), PGDIR / Path("download_extensions")]
|
||||
for folder in cleanup_folders:
|
||||
try:
|
||||
shutil.rmtree(folder)
|
||||
log.info(f"removed folder {folder}")
|
||||
except Exception as err:
|
||||
log.info(f"error removing folder {folder}: {err}")
|
||||
|
||||
|
||||
def upload_files(env):
|
||||
log.info("Uploading test files to mock bucket")
|
||||
os.chdir("test_runner/regress/data/extension_test")
|
||||
for path in os.walk("."):
|
||||
prefix, _, files = path
|
||||
for file in files:
|
||||
# the [2:] is to remove the leading "./"
|
||||
full_path = os.path.join(prefix, file)[2:]
|
||||
|
||||
with open(full_path, "rb") as f:
|
||||
log.info(f"UPLOAD {full_path} to ext/{full_path}")
|
||||
env.remote_storage_client.upload_fileobj(
|
||||
f,
|
||||
env.ext_remote_storage.bucket_name,
|
||||
f"ext/{full_path}",
|
||||
)
|
||||
os.chdir("../../../..")
|
||||
|
||||
|
||||
"""
|
||||
# Test downloading remote extension.
|
||||
@pytest.mark.parametrize("remote_storage_kind", available_s3_storages())
|
||||
def test_remote_extensions(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
remote_storage_kind: RemoteStorageKind,
|
||||
pg_version: PgVersion,
|
||||
):
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=remote_storage_kind,
|
||||
test_name="test_remote_extensions",
|
||||
enable_remote_extensions=True,
|
||||
)
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.init_start()
|
||||
tenant_id, _ = env.neon_cli.create_tenant()
|
||||
env.neon_cli.create_timeline("test_remote_extensions", tenant_id=tenant_id)
|
||||
|
||||
assert env.ext_remote_storage is not None # satisfy mypy
|
||||
assert env.remote_storage_client is not None # satisfy mypy
|
||||
|
||||
# For MOCK_S3 we upload test files.
|
||||
# For REAL_S3 we use the files already in the bucket
|
||||
if remote_storage_kind == RemoteStorageKind.MOCK_S3:
|
||||
upload_files(env)
|
||||
|
||||
# Start a compute node and check that it can download the extensions
|
||||
# and use them to CREATE EXTENSION and LOAD
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_remote_extensions",
|
||||
tenant_id=tenant_id,
|
||||
remote_ext_config=env.ext_remote_storage.to_string(),
|
||||
# config_lines=["log_min_messages=debug3"],
|
||||
)
|
||||
try:
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
# Check that appropriate control files were downloaded
|
||||
cur.execute("SELECT * FROM pg_available_extensions")
|
||||
all_extensions = [x[0] for x in cur.fetchall()]
|
||||
log.info(all_extensions)
|
||||
assert "anon" in all_extensions
|
||||
assert "kq_imcx" in all_extensions
|
||||
|
||||
# postgis is on real s3 but not mock s3.
|
||||
# it's kind of a big file, would rather not upload to github
|
||||
if remote_storage_kind == RemoteStorageKind.REAL_S3:
|
||||
assert "postgis" in all_extensions
|
||||
# this is expected to break on my computer because I lack the necesary dependencies
|
||||
try:
|
||||
cur.execute("CREATE EXTENSION postgis")
|
||||
except Exception as err:
|
||||
log.info(f"(expected) error creating postgis extension: {err}")
|
||||
|
||||
# this is expected to fail on my computer because I don't have the pgcrypto extension
|
||||
try:
|
||||
cur.execute("CREATE EXTENSION anon")
|
||||
except Exception as err:
|
||||
log.info("error creating anon extension")
|
||||
assert "pgcrypto" in str(err), "unexpected error creating anon extension"
|
||||
finally:
|
||||
cleanup(pg_version)
|
||||
|
||||
|
||||
# Test downloading remote library.
|
||||
@pytest.mark.parametrize("remote_storage_kind", available_s3_storages())
|
||||
def test_remote_library(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
remote_storage_kind: RemoteStorageKind,
|
||||
pg_version: PgVersion,
|
||||
):
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=remote_storage_kind,
|
||||
test_name="test_remote_library",
|
||||
enable_remote_extensions=True,
|
||||
)
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.init_start()
|
||||
tenant_id, _ = env.neon_cli.create_tenant()
|
||||
env.neon_cli.create_timeline("test_remote_library", tenant_id=tenant_id)
|
||||
|
||||
assert env.ext_remote_storage is not None # satisfy mypy
|
||||
assert env.remote_storage_client is not None # satisfy mypy
|
||||
|
||||
# For MOCK_S3 we upload test files.
|
||||
# For REAL_S3 we use the files already in the bucket
|
||||
if remote_storage_kind == RemoteStorageKind.MOCK_S3:
|
||||
upload_files(env)
|
||||
|
||||
# and use them to run LOAD library
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_remote_library",
|
||||
tenant_id=tenant_id,
|
||||
remote_ext_config=env.ext_remote_storage.to_string(),
|
||||
# config_lines=["log_min_messages=debug3"],
|
||||
)
|
||||
try:
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
# try to load library
|
||||
try:
|
||||
cur.execute("LOAD 'anon'")
|
||||
except Exception as err:
|
||||
log.info(f"error loading anon library: {err}")
|
||||
raise AssertionError("unexpected error loading anon library") from err
|
||||
|
||||
# test library which name is different from extension name
|
||||
# this fails on my computer because I' missing a dependency
|
||||
# however, it does successfully download the postgis archive
|
||||
if remote_storage_kind == RemoteStorageKind.REAL_S3:
|
||||
try:
|
||||
cur.execute("LOAD 'postgis_topology-3'")
|
||||
except Exception as err:
|
||||
log.info("error loading postgis_topology-3")
|
||||
assert "cannot open shared object file: No such file or directory" in str(
|
||||
err
|
||||
), "unexpected error loading postgis_topology-3"
|
||||
finally:
|
||||
cleanup(pg_version)
|
||||
|
||||
|
||||
"""
|
||||
|
||||
|
||||
# Test extension downloading with mutliple connections to an endpoint.
|
||||
# this test only supports real s3 becuase postgis is too large an extension to
|
||||
# put in our github repo
|
||||
def test_interrupted_extension(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
pg_version: PgVersion,
|
||||
):
|
||||
if "15" in pg_version: # SKIP v15 for now
|
||||
return None
|
||||
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=RemoteStorageKind.REAL_S3,
|
||||
test_name="test_interrupted_extension",
|
||||
enable_remote_extensions=True,
|
||||
)
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.init_start()
|
||||
tenant_id, _ = env.neon_cli.create_tenant()
|
||||
env.neon_cli.create_timeline("test_interrupted_extension", tenant_id=tenant_id)
|
||||
|
||||
assert env.ext_remote_storage is not None # satisfy mypy
|
||||
assert env.remote_storage_client is not None # satisfy mypy
|
||||
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_interrupted_extension",
|
||||
tenant_id=tenant_id,
|
||||
remote_ext_config=env.ext_remote_storage.to_string(),
|
||||
)
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
# cur.execute("CREATE EXTENSION address_standardizer;")
|
||||
cur.execute("CREATE EXTENSION address_standardizer_data_us;")
|
||||
# execute query to ensure that it works
|
||||
cur.execute(
|
||||
"SELECT house_num, name, suftype, city, country, state, unit \
|
||||
FROM standardize_address('us_lex', 'us_gaz', 'us_rules', \
|
||||
'One Rust Place, Boston, MA 02109');"
|
||||
)
|
||||
# the endpoint is closed now
|
||||
# remove postgis files locally
|
||||
cleanup(pg_version)
|
||||
|
||||
# # spin up compute node again (there are no postgis files available, because compute is stateless)
|
||||
# endpoint = env.endpoints.create_start(
|
||||
# "test_remote_library",
|
||||
# tenant_id=tenant_id,
|
||||
# remote_ext_config=env.ext_remote_storage.to_string(),
|
||||
# )
|
||||
# # connect to postrgres and execute the query again
|
||||
# with closing(endpoint.connect()) as conn:
|
||||
# with conn.cursor() as cur:
|
||||
# cur.execute("CREATE EXTENSION address_standardizer;")
|
||||
# cur.execute("CREATE EXTENSION address_standardizer_data_us;")
|
||||
# # execute query to ensure that it works
|
||||
# cur.execute(
|
||||
# "SELECT house_num, name, suftype, city, country, state, unit \
|
||||
# FROM standardize_address('us_lex', 'us_gaz', 'us_rules', \
|
||||
# 'One Rust Place, Boston, MA 02109');"
|
||||
# )
|
||||
@@ -276,6 +276,7 @@ def assert_prefix_empty(neon_env_builder: NeonEnvBuilder, prefix: Optional[str]
|
||||
assert isinstance(neon_env_builder.remote_storage, S3Storage)
|
||||
|
||||
# Note that this doesnt use pagination, so list is not guaranteed to be exhaustive.
|
||||
assert neon_env_builder.remote_storage_client is not None
|
||||
response = neon_env_builder.remote_storage_client.list_objects_v2(
|
||||
Bucket=neon_env_builder.remote_storage.bucket_name,
|
||||
Prefix=prefix or neon_env_builder.remote_storage.prefix_in_bucket or "",
|
||||
@@ -630,7 +631,7 @@ def test_timeline_delete_works_for_remote_smoke(
|
||||
)
|
||||
|
||||
# for some reason the check above doesnt immediately take effect for the below.
|
||||
# Assume it is mock server incosistency and check twice.
|
||||
# Assume it is mock server inconsistency and check twice.
|
||||
wait_until(
|
||||
2,
|
||||
0.5,
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: 12c5dc8281...93a5ee7749
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: e3fbfc4d14...293a06e5e1
4
vendor/revisions.json
vendored
4
vendor/revisions.json
vendored
@@ -1,4 +1,4 @@
|
||||
{
|
||||
"postgres-v15": "e3fbfc4d143b2d3c3c1813ce747f8af35aa9405e",
|
||||
"postgres-v14": "12c5dc8281d20b5bd636e1097eea80a7bc609591"
|
||||
"postgres-v15": "293a06e5e14ed9be3f5002c63b4fac391491ec17",
|
||||
"postgres-v14": "93a5ee7749f109ecb9e5481be485c8cb17fe72ce"
|
||||
}
|
||||
|
||||
@@ -60,6 +60,7 @@ url = { version = "2", features = ["serde"] }
|
||||
[build-dependencies]
|
||||
anyhow = { version = "1", features = ["backtrace"] }
|
||||
bytes = { version = "1", features = ["serde"] }
|
||||
cc = { version = "1", default-features = false, features = ["parallel"] }
|
||||
either = { version = "1" }
|
||||
itertools = { version = "0.10" }
|
||||
libc = { version = "0.2", features = ["extra_traits"] }
|
||||
|
||||
Reference in New Issue
Block a user