diff --git a/Cargo.lock b/Cargo.lock index 4be74614c2..b6879bfcb0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -924,12 +924,14 @@ dependencies = [ "opentelemetry", "postgres", "regex", + "remote_storage", "reqwest", "serde", "serde_json", "tar", "tokio", "tokio-postgres", + "toml_edit", "tracing", "tracing-opentelemetry", "tracing-subscriber", @@ -997,6 +999,7 @@ dependencies = [ "tar", "thiserror", "toml", + "tracing", "url", "utils", "workspace_hack", diff --git a/compute_tools/Cargo.toml b/compute_tools/Cargo.toml index 21226249cf..43d122c90d 100644 --- a/compute_tools/Cargo.toml +++ b/compute_tools/Cargo.toml @@ -30,3 +30,5 @@ url.workspace = true compute_api.workspace = true utils.workspace = true workspace_hack.workspace = true +toml_edit.workspace = true +remote_storage = { version = "0.1", path = "../libs/remote_storage/" } diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 90b39e9dd9..e3dd65d3c4 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -5,6 +5,8 @@ //! - `compute_ctl` accepts cluster (compute node) specification as a JSON file. //! - Every start is a fresh start, so the data directory is removed and //! initialized again on each run. +//! - If remote_extension_config is provided, it will be used to fetch extensions list +//! and download `shared_preload_libraries` from the remote storage. //! - Next it will put configuration files into the `PGDATA` directory. //! - Sync safekeepers and get commit LSN. //! - Get `basebackup` from pageserver using the returned on the previous step LSN. @@ -27,7 +29,8 @@ //! compute_ctl -D /var/db/postgres/compute \ //! -C 'postgresql://cloud_admin@localhost/postgres' \ //! -S /var/db/postgres/specs/current.json \ -//! -b /usr/local/bin/postgres +//! -b /usr/local/bin/postgres \ +//! -r {"bucket": "my-bucket", "region": "eu-central-1", "endpoint": "http:://localhost:9000"} \ //! ``` //! use std::collections::HashMap; @@ -35,7 +38,7 @@ use std::fs::File; use std::panic; use std::path::Path; use std::process::exit; -use std::sync::{mpsc, Arc, Condvar, Mutex}; +use std::sync::{mpsc, Arc, Condvar, Mutex, OnceLock}; use std::{thread, time::Duration}; use anyhow::{Context, Result}; @@ -48,6 +51,7 @@ use compute_api::responses::ComputeStatus; use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec}; use compute_tools::configurator::launch_configurator; +use compute_tools::extension_server::{get_pg_version, init_remote_storage}; use compute_tools::http::api::launch_http_server; use compute_tools::logger::*; use compute_tools::monitor::launch_monitor; @@ -64,6 +68,22 @@ fn main() -> Result<()> { info!("build_tag: {build_tag}"); let matches = cli().get_matches(); + let pgbin_default = String::from("postgres"); + let pgbin = matches.get_one::("pgbin").unwrap_or(&pgbin_default); + + let remote_ext_config = matches.get_one::("remote-ext-config"); + let ext_remote_storage = match remote_ext_config { + Some(x) => match init_remote_storage(x, build_tag) { + Ok(y) => Some(y), + Err(e) => { + panic!( + "cannot initialize remote extension storage from config {}: {}", + x, e + ); + } + }, + None => None, + }; let http_port = *matches .get_one::("http-port") @@ -128,9 +148,6 @@ fn main() -> Result<()> { let compute_id = matches.get_one::("compute-id"); let control_plane_uri = matches.get_one::("control-plane-uri"); - // Try to use just 'postgres' if no path is provided - let pgbin = matches.get_one::("pgbin").unwrap(); - let spec; let mut live_config_allowed = false; match spec_json { @@ -168,6 +185,7 @@ fn main() -> Result<()> { let mut new_state = ComputeState::new(); let spec_set; + if let Some(spec) = spec { let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?; new_state.pspec = Some(pspec); @@ -179,9 +197,13 @@ fn main() -> Result<()> { connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?, pgdata: pgdata.to_string(), pgbin: pgbin.to_string(), + pgversion: get_pg_version(pgbin), live_config_allowed, state: Mutex::new(new_state), state_changed: Condvar::new(), + ext_remote_storage, + available_libraries: OnceLock::new(), + available_extensions: OnceLock::new(), }; let compute = Arc::new(compute_node); @@ -190,6 +212,8 @@ fn main() -> Result<()> { let _http_handle = launch_http_server(http_port, &compute).expect("cannot launch http endpoint thread"); + let extension_server_port: u16 = http_port; + if !spec_set { // No spec provided, hang waiting for it. info!("no compute spec provided, waiting"); @@ -230,7 +254,7 @@ fn main() -> Result<()> { // Start Postgres let mut delay_exit = false; let mut exit_code = None; - let pg = match compute.start_compute() { + let pg = match compute.start_compute(extension_server_port) { Ok(pg) => Some(pg), Err(err) => { error!("could not start the compute node: {:?}", err); @@ -349,6 +373,12 @@ fn cli() -> clap::Command { .long("control-plane-uri") .value_name("CONTROL_PLANE_API_BASE_URI"), ) + .arg( + Arg::new("remote-ext-config") + .short('r') + .long("remote-ext-config") + .value_name("REMOTE_EXT_CONFIG"), + ) } #[test] diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 87acefc1bb..840fe84611 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -1,13 +1,15 @@ +use std::collections::HashMap; use std::fs; use std::os::unix::fs::PermissionsExt; use std::path::Path; use std::process::{Command, Stdio}; use std::str::FromStr; -use std::sync::{Condvar, Mutex}; +use std::sync::{Condvar, Mutex, OnceLock}; use anyhow::{Context, Result}; use chrono::{DateTime, Utc}; use postgres::{Client, NoTls}; +use tokio; use tokio_postgres; use tracing::{info, instrument, warn}; use utils::id::{TenantId, TimelineId}; @@ -16,9 +18,11 @@ use utils::lsn::Lsn; use compute_api::responses::{ComputeMetrics, ComputeStatus}; use compute_api::spec::{ComputeMode, ComputeSpec}; -use crate::config; +use remote_storage::{GenericRemoteStorage, RemotePath}; + use crate::pg_helpers::*; use crate::spec::*; +use crate::{config, extension_server}; /// Compute node info shared across several `compute_ctl` threads. pub struct ComputeNode { @@ -26,6 +30,7 @@ pub struct ComputeNode { pub connstr: url::Url, pub pgdata: String, pub pgbin: String, + pub pgversion: String, /// We should only allow live re- / configuration of the compute node if /// it uses 'pull model', i.e. it can go to control-plane and fetch /// the latest configuration. Otherwise, there could be a case: @@ -45,6 +50,11 @@ pub struct ComputeNode { pub state: Mutex, /// `Condvar` to allow notifying waiters about state changes. pub state_changed: Condvar, + /// the S3 bucket that we search for extensions in + pub ext_remote_storage: Option, + // cached lists of available extensions and libraries + pub available_libraries: OnceLock>, + pub available_extensions: OnceLock>>, } #[derive(Clone, Debug)] @@ -323,14 +333,22 @@ impl ComputeNode { /// Do all the preparations like PGDATA directory creation, configuration, /// safekeepers sync, basebackup, etc. #[instrument(skip_all)] - pub fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> { + pub fn prepare_pgdata( + &self, + compute_state: &ComputeState, + extension_server_port: u16, + ) -> Result<()> { let pspec = compute_state.pspec.as_ref().expect("spec must be set"); let spec = &pspec.spec; let pgdata_path = Path::new(&self.pgdata); // Remove/create an empty pgdata directory and put configuration there. self.create_pgdata()?; - config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &pspec.spec)?; + config::write_postgres_conf( + &pgdata_path.join("postgresql.conf"), + &pspec.spec, + Some(extension_server_port), + )?; // Syncing safekeepers is only safe with primary nodes: if a primary // is already connected it will be kicked out, so a secondary (standby) @@ -472,7 +490,7 @@ impl ComputeNode { // Write new config let pgdata_path = Path::new(&self.pgdata); - config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec)?; + config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec, None)?; let mut client = Client::connect(self.connstr.as_str(), NoTls)?; self.pg_reload_conf(&mut client)?; @@ -502,7 +520,7 @@ impl ComputeNode { } #[instrument(skip_all)] - pub fn start_compute(&self) -> Result { + pub fn start_compute(&self, extension_server_port: u16) -> Result { let compute_state = self.state.lock().unwrap().clone(); let pspec = compute_state.pspec.as_ref().expect("spec must be set"); info!( @@ -513,7 +531,9 @@ impl ComputeNode { pspec.timeline_id, ); - self.prepare_pgdata(&compute_state)?; + self.prepare_external_extensions(&compute_state)?; + + self.prepare_pgdata(&compute_state, extension_server_port)?; let start_time = Utc::now(); @@ -649,4 +669,132 @@ LIMIT 100", "{{\"pg_stat_statements\": []}}".to_string() } } + + // If remote extension storage is configured, + // download extension control files + // and shared preload libraries. + #[tokio::main] + pub async fn prepare_external_extensions(&self, compute_state: &ComputeState) -> Result<()> { + if let Some(ref ext_remote_storage) = self.ext_remote_storage { + let pspec = compute_state.pspec.as_ref().expect("spec must be set"); + // download preload shared libraries before postgres start (if any) + let spec = &pspec.spec; + + // 1. parse custom extension paths from spec + let custom_ext_prefixes = match &spec.custom_extensions { + Some(custom_extensions) => custom_extensions.clone(), + None => Vec::new(), + }; + + info!("custom_ext_prefixes: {:?}", &custom_ext_prefixes); + + // 2. parse shared_preload_libraries from spec + let mut libs_vec = Vec::new(); + + if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") { + libs_vec = libs + .split(&[',', '\'', ' ']) + .filter(|s| *s != "neon" && !s.is_empty()) + .map(str::to_string) + .collect(); + } + + info!( + "shared_preload_libraries parsed from spec.cluster.settings: {:?}", + libs_vec + ); + + // also parse shared_preload_libraries from provided postgresql.conf + // that is used in neon_local and python tests + if let Some(conf) = &spec.cluster.postgresql_conf { + let conf_lines = conf.split('\n').collect::>(); + + let mut shared_preload_libraries_line = ""; + for line in conf_lines { + if line.starts_with("shared_preload_libraries") { + shared_preload_libraries_line = line; + } + } + + let mut preload_libs_vec = Vec::new(); + if let Some(libs) = shared_preload_libraries_line.split("='").nth(1) { + preload_libs_vec = libs + .split(&[',', '\'', ' ']) + .filter(|s| *s != "neon" && !s.is_empty()) + .map(str::to_string) + .collect(); + } + + info!( + "shared_preload_libraries parsed from spec.cluster.postgresql_conf: {:?}", + preload_libs_vec + ); + + libs_vec.extend(preload_libs_vec); + } + + info!("Libraries to download: {:?}", &libs_vec); + + // download extension control files & shared_preload_libraries + + let available_extensions = extension_server::get_available_extensions( + ext_remote_storage, + &self.pgbin, + &self.pgversion, + &custom_ext_prefixes, + ) + .await?; + self.available_extensions + .set(available_extensions) + .expect("available_extensions.set error"); + + let available_libraries = extension_server::get_available_libraries( + ext_remote_storage, + &self.pgbin, + &self.pgversion, + &custom_ext_prefixes, + &libs_vec, + ) + .await?; + self.available_libraries + .set(available_libraries) + .expect("available_libraries.set error"); + } + + Ok(()) + } + + pub async fn download_extension_files(&self, filename: String) -> Result<()> { + match &self.ext_remote_storage { + None => anyhow::bail!("No remote extension storage"), + Some(remote_storage) => { + extension_server::download_extension_files( + &filename, + remote_storage, + &self.pgbin, + self.available_extensions + .get() + .context("available_extensions broke")?, + ) + .await + } + } + } + + pub async fn download_library_file(&self, filename: String) -> Result<()> { + match &self.ext_remote_storage { + None => anyhow::bail!("No remote extension storage"), + Some(remote_storage) => { + extension_server::download_library_file( + &filename, + remote_storage, + &self.pgbin, + self.available_libraries + .get() + .context("available_libraries broke")?, + ) + .await + } + } + } } diff --git a/compute_tools/src/config.rs b/compute_tools/src/config.rs index 99346433d0..1b9d5037d5 100644 --- a/compute_tools/src/config.rs +++ b/compute_tools/src/config.rs @@ -33,7 +33,11 @@ pub fn line_in_file(path: &Path, line: &str) -> Result { } /// Create or completely rewrite configuration file specified by `path` -pub fn write_postgres_conf(path: &Path, spec: &ComputeSpec) -> Result<()> { +pub fn write_postgres_conf( + path: &Path, + spec: &ComputeSpec, + extension_server_port: Option, +) -> Result<()> { // File::create() destroys the file content if it exists. let mut file = File::create(path)?; @@ -95,5 +99,9 @@ pub fn write_postgres_conf(path: &Path, spec: &ComputeSpec) -> Result<()> { writeln!(file, "# Managed by compute_ctl: end")?; } + if let Some(port) = extension_server_port { + writeln!(file, "neon.extension_server_port={}", port)?; + } + Ok(()) } diff --git a/compute_tools/src/extension_server.rs b/compute_tools/src/extension_server.rs new file mode 100644 index 0000000000..11b42abf8c --- /dev/null +++ b/compute_tools/src/extension_server.rs @@ -0,0 +1,384 @@ +// Download extension files from the extension store +// and put them in the right place in the postgres directory +use anyhow::{self, bail, Context, Result}; +use remote_storage::*; +use serde_json::{self, Value}; +use std::collections::HashMap; +use std::fs::File; +use std::io::{BufWriter, Write}; +use std::num::{NonZeroU32, NonZeroUsize}; +use std::path::{Path, PathBuf}; +use std::str; +use tokio::io::AsyncReadExt; +use tracing::info; + +const SHARE_EXT_PATH: &str = "share/postgresql/extension"; + +fn get_pg_config(argument: &str, pgbin: &str) -> String { + // gives the result of `pg_config [argument]` + // where argument is a flag like `--version` or `--sharedir` + let pgconfig = pgbin.replace("postgres", "pg_config"); + let config_output = std::process::Command::new(pgconfig) + .arg(argument) + .output() + .expect("pg_config error"); + std::str::from_utf8(&config_output.stdout) + .expect("pg_config error") + .trim() + .to_string() +} + +pub fn get_pg_version(pgbin: &str) -> String { + // pg_config --version returns a (platform specific) human readable string + // such as "PostgreSQL 15.4". We parse this to v14/v15 + let human_version = get_pg_config("--version", pgbin); + if human_version.contains("15") { + return "v15".to_string(); + } else if human_version.contains("14") { + return "v14".to_string(); + } + panic!("Unsuported postgres version {human_version}"); +} + +async fn download_helper( + remote_storage: &GenericRemoteStorage, + remote_from_path: &RemotePath, + remote_from_prefix: Option<&Path>, + download_location: &Path, +) -> anyhow::Result<()> { + // downloads file at remote_from_path to download_location/[file_name] + + // we cannot use remote_from_path.object_name() here + // because extension files can be in subdirectories of the extension store. + // + // To handle this, we use remote_from_prefix to strip the prefix from the path + // this gives us the relative path of the file in the extension store, + // and we use this relative path to construct the local path. + // + let local_path = match remote_from_prefix { + Some(prefix) => { + let p = remote_from_path + .get_path() + .strip_prefix(prefix) + .expect("bad prefix"); + + download_location.join(p) + } + None => download_location.join(remote_from_path.object_name().expect("bad object")), + }; + + if local_path.exists() { + info!("File {:?} already exists. Skipping download", &local_path); + return Ok(()); + } + + info!( + "Downloading {:?} to location {:?}", + &remote_from_path, &local_path + ); + let mut download = remote_storage.download(remote_from_path).await?; + let mut write_data_buffer = Vec::new(); + download + .download_stream + .read_to_end(&mut write_data_buffer) + .await?; + if remote_from_prefix.is_some() { + if let Some(prefix) = local_path.parent() { + info!( + "Downloading file with prefix. Create directory {:?}", + prefix + ); + // if directory already exists, this is a no-op + std::fs::create_dir_all(prefix)?; + } + } + + let mut output_file = BufWriter::new(File::create(local_path)?); + output_file.write_all(&write_data_buffer)?; + Ok(()) +} + +// download extension control files +// +// if custom_ext_prefixes is provided - search also in custom extension paths +// +pub async fn get_available_extensions( + remote_storage: &GenericRemoteStorage, + pgbin: &str, + pg_version: &str, + custom_ext_prefixes: &Vec, +) -> anyhow::Result>> { + let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension"); + + let mut paths: Vec = Vec::new(); + // public extensions + paths.push(RemotePath::new( + &Path::new(pg_version).join(SHARE_EXT_PATH), + )?); + // custom extensions + for custom_prefix in custom_ext_prefixes { + paths.push(RemotePath::new( + &Path::new(pg_version) + .join(custom_prefix) + .join(SHARE_EXT_PATH), + )?); + } + + let all_available_files = list_files_in_prefixes_for_extensions(remote_storage, &paths).await?; + + info!( + "list of available_extension files {:?}", + &all_available_files + ); + + // download all control files + for (obj_name, obj_paths) in &all_available_files { + for obj_path in obj_paths { + if obj_name.ends_with("control") { + download_helper(remote_storage, obj_path, None, &local_sharedir).await?; + } + } + } + + Ok(all_available_files) +} + +// Download requested shared_preload_libraries +// +// Note that tenant_id is not optional here, because we only download libraries +// after we know the tenant spec and the tenant_id. +// +// return list of all library files to use it in the future searches +pub async fn get_available_libraries( + remote_storage: &GenericRemoteStorage, + pgbin: &str, + pg_version: &str, + custom_ext_prefixes: &Vec, + preload_libraries: &Vec, +) -> anyhow::Result> { + let local_libdir: PathBuf = Path::new(&get_pg_config("--pkglibdir", pgbin)).into(); + // Construct a hashmap of all available libraries + // example (key, value) pair: test_lib0.so, v14/lib/test_lib0.so + + let mut paths: Vec = Vec::new(); + // public libraries + paths.push( + RemotePath::new(&Path::new(&pg_version).join("lib/")) + .expect("The hard coded path here is valid"), + ); + // custom libraries + for custom_prefix in custom_ext_prefixes { + paths.push( + RemotePath::new(&Path::new(&pg_version).join(custom_prefix).join("lib")) + .expect("The hard coded path here is valid"), + ); + } + + let all_available_libraries = list_files_in_prefixes(remote_storage, &paths).await?; + + info!("list of library files {:?}", &all_available_libraries); + + // download all requested libraries + for lib_name in preload_libraries { + // add file extension if it isn't in the filename + let lib_name_with_ext = enforce_so_end(lib_name); + info!("looking for library {:?}", &lib_name_with_ext); + + match all_available_libraries.get(&*lib_name_with_ext) { + Some(remote_path) => { + download_helper(remote_storage, remote_path, None, &local_libdir).await? + } + None => { + let file_path = local_libdir.join(&lib_name_with_ext); + if file_path.exists() { + info!("File {:?} already exists. Skipping download", &file_path); + } else { + bail!("Shared library file {lib_name} is not found in the extension store") + } + } + } + } + + Ok(all_available_libraries) +} + +// download all sqlfiles (and possibly data files) for a given extension name +// +pub async fn download_extension_files( + ext_name: &str, + remote_storage: &GenericRemoteStorage, + pgbin: &str, + all_available_files: &HashMap>, +) -> Result<()> { + let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension"); + let mut downloaded_something = false; + + if let Some(files) = all_available_files.get(ext_name) { + for file in files { + if file.extension().context("bad file name")? != "control" { + // find files prefix to handle cases when extension files are stored + // in a directory with the same name as the extension + // example: + // share/postgresql/extension/extension_name/extension_name--1.0.sql + let index = file + .get_path() + .to_str() + .context("invalid path")? + .find(ext_name) + .context("invalid path")?; + + let prefix_str = + file.get_path().to_str().context("invalid path")?[..index].to_string(); + let remote_from_prefix = if prefix_str.is_empty() { + None + } else { + Some(Path::new(&prefix_str)) + }; + + download_helper(remote_storage, file, remote_from_prefix, &local_sharedir).await?; + downloaded_something = true; + } + } + } + if !downloaded_something { + bail!("Files for extension {ext_name} are not found in the extension store"); + } + Ok(()) +} + +// appends an .so suffix to libname if it does not already have one +fn enforce_so_end(libname: &str) -> String { + if !libname.ends_with(".so") { + format!("{}.so", libname) + } else { + libname.to_string() + } +} + +// download shared library file +pub async fn download_library_file( + lib_name: &str, + remote_storage: &GenericRemoteStorage, + pgbin: &str, + all_available_libraries: &HashMap, +) -> Result<()> { + let local_libdir: PathBuf = Path::new(&get_pg_config("--pkglibdir", pgbin)).into(); + let lib_name_with_ext = enforce_so_end(lib_name); + info!("looking for library {:?}", &lib_name_with_ext); + match all_available_libraries.get(&*lib_name_with_ext) { + Some(remote_path) => { + download_helper(remote_storage, remote_path, None, &local_libdir).await? + } + None => bail!("Shared library file {lib_name} is not found in the extension store"), + } + Ok(()) +} + +pub fn init_remote_storage( + remote_ext_config: &str, + default_prefix: &str, +) -> anyhow::Result { + let remote_ext_config: serde_json::Value = serde_json::from_str(remote_ext_config)?; + + let remote_ext_bucket = match &remote_ext_config["bucket"] { + Value::String(x) => x, + _ => bail!("remote_ext_config missing bucket"), + }; + let remote_ext_region = match &remote_ext_config["region"] { + Value::String(x) => x, + _ => bail!("remote_ext_config missing region"), + }; + let remote_ext_endpoint = match &remote_ext_config["endpoint"] { + Value::String(x) => Some(x.clone()), + _ => None, + }; + let remote_ext_prefix = match &remote_ext_config["prefix"] { + Value::String(x) => Some(x.clone()), + // if prefix is not provided, use default, which is the build_tag + _ => Some(default_prefix.to_string()), + }; + + // load will not be large, so default parameters are fine + let config = S3Config { + bucket_name: remote_ext_bucket.to_string(), + bucket_region: remote_ext_region.to_string(), + prefix_in_bucket: remote_ext_prefix, + endpoint: remote_ext_endpoint, + concurrency_limit: NonZeroUsize::new(100).expect("100 != 0"), + max_keys_per_list_response: None, + }; + let config = RemoteStorageConfig { + max_concurrent_syncs: NonZeroUsize::new(100).expect("100 != 0"), + max_sync_errors: NonZeroU32::new(100).expect("100 != 0"), + storage: RemoteStorageKind::AwsS3(config), + }; + GenericRemoteStorage::from_config(&config) +} + +// helper to collect all files in the given prefixes +// returns hashmap of (file_name, file_remote_path) +async fn list_files_in_prefixes( + remote_storage: &GenericRemoteStorage, + paths: &Vec, +) -> Result> { + let mut res = HashMap::new(); + + for path in paths { + for file in remote_storage.list_files(Some(path)).await? { + res.insert( + file.object_name().expect("bad object").to_owned(), + file.to_owned(), + ); + } + } + + Ok(res) +} + +// helper to extract extension name +// extension files can be in subdirectories of the extension store. +// examples of layout: +// +// share/postgresql/extension/extension_name--1.0.sql +// +// or +// +// share/postgresql/extension/extension_name/extension_name--1.0.sql +// share/postgresql/extension/extension_name/extra_data.csv +// +// Note: we **assume** that the extension files is in one of these formats. +// If it is not, this code will not download it. +fn get_ext_name(path: &str) -> Result<&str> { + let path_suffix: Vec<&str> = path.split(&format!("{SHARE_EXT_PATH}/")).collect(); + + let path_suffix = path_suffix.last().expect("bad ext name"); + // the order of these is important + // otherwise we'll return incorrect extension name + // for path like share/postgresql/extension/extension_name/extension_name--1.0.sql + for index in ["/", "--"] { + if let Some(index) = path_suffix.find(index) { + return Ok(&path_suffix[..index]); + } + } + Ok(path_suffix) +} + +// helper to collect files of given prefixes for extensions +// and group them by extension +// returns a hashmap of (extension_name, Vector of remote paths for all files needed for this extension) +async fn list_files_in_prefixes_for_extensions( + remote_storage: &GenericRemoteStorage, + paths: &Vec, +) -> Result>> { + let mut result = HashMap::new(); + for path in paths { + for file in remote_storage.list_files(Some(path)).await? { + let file_ext_name = get_ext_name(file.get_path().to_str().context("invalid path")?)?; + let ext_file_list = result + .entry(file_ext_name.to_string()) + .or_insert(Vec::new()); + ext_file_list.push(file.to_owned()); + } + } + Ok(result) +} diff --git a/compute_tools/src/http/api.rs b/compute_tools/src/http/api.rs index afd9c2fb54..0fbb334199 100644 --- a/compute_tools/src/http/api.rs +++ b/compute_tools/src/http/api.rs @@ -121,6 +121,55 @@ async fn routes(req: Request, compute: &Arc) -> Response { + info!("serving {:?} POST request", route); + info!("req.uri {:?}", req.uri()); + + let mut is_library = false; + + if let Some(params) = req.uri().query() { + info!("serving {:?} POST request with params: {}", route, params); + + if params == "is_library=true" { + is_library = true; + } else { + let mut resp = Response::new(Body::from("Wrong request parameters")); + *resp.status_mut() = StatusCode::BAD_REQUEST; + return resp; + } + } + + let filename = route.split('/').last().unwrap().to_string(); + + info!( + "serving /extension_server POST request, filename: {:?} is_library: {}", + filename, is_library + ); + + if is_library { + match compute.download_library_file(filename.to_string()).await { + Ok(_) => Response::new(Body::from("OK")), + Err(e) => { + error!("library download failed: {}", e); + let mut resp = Response::new(Body::from(e.to_string())); + *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; + resp + } + } + } else { + match compute.download_extension_files(filename.to_string()).await { + Ok(_) => Response::new(Body::from("OK")), + Err(e) => { + error!("extension download failed: {}", e); + let mut resp = Response::new(Body::from(e.to_string())); + *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; + resp + } + } + } + } + // Return the `404 Not Found` for any other routes. _ => { let mut not_found = Response::new(Body::from("404 Not Found")); diff --git a/compute_tools/src/http/openapi_spec.yaml b/compute_tools/src/http/openapi_spec.yaml index 2680269756..dc26cc63eb 100644 --- a/compute_tools/src/http/openapi_spec.yaml +++ b/compute_tools/src/http/openapi_spec.yaml @@ -139,6 +139,34 @@ paths: application/json: schema: $ref: "#/components/schemas/GenericError" + /extension_server: + post: + tags: + - Extension + summary: Download extension from S3 to local folder. + description: "" + operationId: downloadExtension + responses: + 200: + description: Extension downloaded + content: + text/plain: + schema: + type: string + description: Error text or 'OK' if download succeeded. + example: "OK" + 400: + description: Request is invalid. + content: + application/json: + schema: + $ref: "#/components/schemas/GenericError" + 500: + description: Extension download request failed. + content: + application/json: + schema: + $ref: "#/components/schemas/GenericError" components: securitySchemes: diff --git a/compute_tools/src/lib.rs b/compute_tools/src/lib.rs index 24811f75ee..c061ab2da3 100644 --- a/compute_tools/src/lib.rs +++ b/compute_tools/src/lib.rs @@ -9,6 +9,7 @@ pub mod http; #[macro_use] pub mod logger; pub mod compute; +pub mod extension_server; pub mod monitor; pub mod params; pub mod pg_helpers; diff --git a/compute_tools/src/spec.rs b/compute_tools/src/spec.rs index 520696da00..b694b512ab 100644 --- a/compute_tools/src/spec.rs +++ b/compute_tools/src/spec.rs @@ -124,7 +124,7 @@ pub fn get_spec_from_control_plane( pub fn handle_configuration(spec: &ComputeSpec, pgdata_path: &Path) -> Result<()> { // File `postgresql.conf` is no longer included into `basebackup`, so just // always write all config into it creating new file. - config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), spec)?; + config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), spec, None)?; update_pg_hba(pgdata_path)?; diff --git a/control_plane/Cargo.toml b/control_plane/Cargo.toml index a341ff0263..d2c99c5f36 100644 --- a/control_plane/Cargo.toml +++ b/control_plane/Cargo.toml @@ -32,3 +32,4 @@ utils.workspace = true compute_api.workspace = true workspace_hack.workspace = true +tracing.workspace = true diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 8995a18564..8f71cb65e2 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -658,6 +658,8 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<( .get_one::("endpoint_id") .ok_or_else(|| anyhow!("No endpoint ID was provided to start"))?; + let remote_ext_config = sub_args.get_one::("remote-ext-config"); + // If --safekeepers argument is given, use only the listed safekeeper nodes. let safekeepers = if let Some(safekeepers_str) = sub_args.get_one::("safekeepers") { @@ -699,7 +701,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<( _ => {} } println!("Starting existing endpoint {endpoint_id}..."); - endpoint.start(&auth_token, safekeepers)?; + endpoint.start(&auth_token, safekeepers, remote_ext_config)?; } else { let branch_name = sub_args .get_one::("branch-name") @@ -743,7 +745,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<( pg_version, mode, )?; - ep.start(&auth_token, safekeepers)?; + ep.start(&auth_token, safekeepers, remote_ext_config)?; } } "stop" => { @@ -1003,6 +1005,12 @@ fn cli() -> Command { .help("Additional pageserver's configuration options or overrides, refer to pageserver's 'config-override' CLI parameter docs for more") .required(false); + let remote_ext_config_args = Arg::new("remote-ext-config") + .long("remote-ext-config") + .num_args(1) + .help("Configure the S3 bucket that we search for extensions in.") + .required(false); + let lsn_arg = Arg::new("lsn") .long("lsn") .help("Specify Lsn on the timeline to start from. By default, end of the timeline would be used.") @@ -1161,6 +1169,7 @@ fn cli() -> Command { .arg(pg_version_arg) .arg(hot_standby_arg) .arg(safekeepers_arg) + .arg(remote_ext_config_args) ) .subcommand( Command::new("stop") diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index 52683ff1c3..07d33478bd 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -311,7 +311,7 @@ impl Endpoint { // TODO: use future host field from safekeeper spec // Pass the list of safekeepers to the replica so that it can connect to any of them, - // whichever is availiable. + // whichever is available. let sk_ports = self .env .safekeepers @@ -408,7 +408,12 @@ impl Endpoint { Ok(()) } - pub fn start(&self, auth_token: &Option, safekeepers: Vec) -> Result<()> { + pub fn start( + &self, + auth_token: &Option, + safekeepers: Vec, + remote_ext_config: Option<&String>, + ) -> Result<()> { if self.status() == "running" { anyhow::bail!("The endpoint is already running"); } @@ -476,6 +481,13 @@ impl Endpoint { pageserver_connstring: Some(pageserver_connstring), safekeeper_connstrings, storage_auth_token: auth_token.clone(), + // This is a hack to test custom extensions locally. + // In test_download_extensions, we assume that the custom extension + // prefix is the tenant ID. So we set it here. + // + // The proper way to implement this is to pass the custom extension + // in spec, but we don't have a way to do that yet in the python tests. + custom_extensions: Some(vec![self.tenant_id.to_string()]), }; let spec_path = self.endpoint_path().join("spec.json"); std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?; @@ -507,6 +519,9 @@ impl Endpoint { .stdin(std::process::Stdio::null()) .stderr(logfile.try_clone()?) .stdout(logfile); + if let Some(remote_ext_config) = remote_ext_config { + cmd.args(["--remote-ext-config", remote_ext_config]); + } let _child = cmd.spawn()?; // Wait for it to start diff --git a/docs/rfcs/024-extension-loading.md b/docs/rfcs/024-extension-loading.md new file mode 100644 index 0000000000..bc9ed4407b --- /dev/null +++ b/docs/rfcs/024-extension-loading.md @@ -0,0 +1,183 @@ +# Supporting custom user Extensions (Dynamic Extension Loading) +Created 2023-05-03 + +## Motivation + +There are many extensions in the PostgreSQL ecosystem, and not all extensions +are of a quality that we can confidently support them. Additionally, our +current extension inclusion mechanism has several problems because we build all +extensions into the primary Compute image: We build the extensions every time +we build the compute image regardless of whether we actually need to rebuild +the image, and the inclusion of these extensions in the image adds a hard +dependency on all supported extensions - thus increasing the image size, and +with it the time it takes to download that image - increasing first start +latency. + +This RFC proposes a dynamic loading mechanism that solves most of these +problems. + +## Summary + +`compute_ctl` is made responsible for loading extensions on-demand into +the container's file system for dynamically loaded extensions, and will also +make sure that the extensions in `shared_preload_libraries` are downloaded +before the compute node starts. + +## Components + +compute_ctl, PostgreSQL, neon (extension), Compute Host Node, Extension Store + +## Requirements + +Compute nodes with no extra extensions should not be negatively impacted by +the existence of support for many extensions. + +Installing an extension into PostgreSQL should be easy. + +Non-preloaded extensions shouldn't impact startup latency. + +Uninstalled extensions shouldn't impact query latency. + +A small latency penalty for dynamically loaded extensions is acceptable in +the first seconds of compute startup, but not in steady-state operations. + +## Proposed implementation + +### On-demand, JIT-loading of extensions + +Before postgres starts we download +- control files for all extensions available to that compute node; +- all `shared_preload_libraries`; + +After postgres is running, `compute_ctl` listens for requests to load files. +When PostgreSQL requests a file, `compute_ctl` downloads it. + +PostgreSQL requests files in the following cases: +- When loading a preload library set in `local_preload_libraries` +- When explicitly loading a library with `LOAD` +- Wnen creating extension with `CREATE EXTENSION` (download sql scripts, (optional) extension data files and (optional) library files))) + + +#### Summary + +Pros: + - Startup is only as slow as it takes to load all (shared_)preload_libraries + - Supports BYO Extension + +Cons: + - O(sizeof(extensions)) IO requirement for loading all extensions. + +### Alternative solutions + +1. Allow users to add their extensions to the base image + + Pros: + - Easy to deploy + + Cons: + - Doesn't scale - first start size is dependent on image size; + - All extensions are shared across all users: It doesn't allow users to + bring their own restrictive-licensed extensions + +2. Bring Your Own compute image + + Pros: + - Still easy to deploy + - User can bring own patched version of PostgreSQL + + Cons: + - First start latency is O(sizeof(extensions image)) + - Warm instance pool for skipping pod schedule latency is not feasible with + O(n) custom images + - Support channels are difficult to manage + +3. Download all user extensions in bulk on compute start + + Pros: + - Easy to deploy + - No startup latency issues for "clean" users. + - Warm instance pool for skipping pod schedule latency is possible + + Cons: + - Downloading all extensions in advance takes a lot of time, thus startup + latency issues + +4. Store user's extensions in persistent storage + + Pros: + - Easy to deploy + - No startup latency issues + - Warm instance pool for skipping pod schedule latency is possible + + Cons: + - EC2 instances have only limited number of attachments shared between EBS + volumes, direct-attached NVMe drives, and ENIs. + - Compute instance migration isn't trivially solved for EBS mounts (e.g. + the device is unavailable whilst moving the mount between instances). + - EBS can only mount on one instance at a time (except the expensive IO2 + device type). + +5. Store user's extensions in network drive + + Pros: + - Easy to deploy + - Few startup latency issues + - Warm instance pool for skipping pod schedule latency is possible + + Cons: + - We'd need networked drives, and a lot of them, which would store many + duplicate extensions. + - **UNCHECKED:** Compute instance migration may not work nicely with + networked IOs + + +### Idea extensions + +The extension store does not have to be S3 directly, but could be a Node-local +caching service on top of S3. This would reduce the load on the network for +popular extensions. + +## Extension Storage implementation + +Extension Storage in our case is an S3 bucket with a "directory" per build and postgres version, +where extension files are stored as plain files in the bucket following the same directory structure as in the postgres. + +i.e. + +`s3://///lib/postgis-3.1.so` +`s3://///share/extension/postgis.control` +`s3://///share/extension/postgis--3.1.sql` + +To handle custom extensions, that available only to specific users, we use per-extension subdirectories: + +i.e. +`s3://////lib/ext-name.so`, etc. +`s3://////share/extension/ext-name.control`, etc. + +On compute start, `compute_ctl` accepts a list of custom_ext_prefixes. + +To get the list of available extensions,`compute_ctl` downloads control files from all prefixes: + +`s3://///share/extension/` +`s3://////share/extension/` +`s3://////share/extension/` + + + +### How to add new extension to the Extension Storage? + +Simply upload build artifacts to the S3 bucket. +Implement a CI step for that. Splitting it from ompute-node-image build. + +### How do we deal with extension versions and updates? + +Currently, we rebuild extensions on every compute-node-image build and store them in the prefix. +This is needed to ensure that `/share` and `/lib` files are in sync. + +For extension updates, we rely on the PostgreSQL extension versioning mechanism (sql update scripts) and extension authors to not break backwards compatibility within one major version of PostgreSQL. + +### Alternatives + +For extensions written on trusted languages we can also adopt +`dbdev` PostgreSQL Package Manager based on `pg_tle` by Supabase. +This will increase the amount supported extensions and decrease the amount of work required to support them. diff --git a/libs/compute_api/src/spec.rs b/libs/compute_api/src/spec.rs index b3f0e9ba43..293f6dc294 100644 --- a/libs/compute_api/src/spec.rs +++ b/libs/compute_api/src/spec.rs @@ -60,6 +60,9 @@ pub struct ComputeSpec { /// If set, 'storage_auth_token' is used as the password to authenticate to /// the pageserver and safekeepers. pub storage_auth_token: Option, + + // list of prefixes to search for custom extensions in remote extension storage + pub custom_extensions: Option>, } #[serde_as] diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index 0e9c237e1e..51e5fe0adb 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -184,6 +184,20 @@ pub enum GenericRemoteStorage { } impl GenericRemoteStorage { + // A function for listing all the files in a "directory" + // Example: + // list_files("foo/bar") = ["foo/bar/a.txt", "foo/bar/b.txt"] + pub async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result> { + match self { + Self::LocalFs(s) => s.list_files(folder).await, + Self::AwsS3(s) => s.list_files(folder).await, + Self::Unreliable(s) => s.list_files(folder).await, + } + } + + // lists common *prefixes*, if any of files + // Example: + // list_prefixes("foo123","foo567","bar123","bar432") = ["foo", "bar"] pub async fn list_prefixes( &self, prefix: Option<&RemotePath>, @@ -195,14 +209,6 @@ impl GenericRemoteStorage { } } - pub async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result> { - match self { - Self::LocalFs(s) => s.list_files(folder).await, - Self::AwsS3(s) => s.list_files(folder).await, - Self::Unreliable(s) => s.list_files(folder).await, - } - } - pub async fn upload( &self, from: impl io::AsyncRead + Unpin + Send + Sync + 'static, diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index 43d818dfb9..d71592eb93 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -349,10 +349,17 @@ impl RemoteStorage for S3Bucket { /// See the doc for `RemoteStorage::list_files` async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result> { - let folder_name = folder + let mut folder_name = folder .map(|p| self.relative_path_to_s3_object(p)) .or_else(|| self.prefix_in_bucket.clone()); + // remove leading "/" if one exists + if let Some(folder_name_slash) = folder_name.clone() { + if folder_name_slash.starts_with(REMOTE_STORAGE_PREFIX_SEPARATOR) { + folder_name = Some(folder_name_slash[1..].to_string()); + } + } + // AWS may need to break the response into several parts let mut continuation_token = None; let mut all_files = vec![]; diff --git a/pgxn/neon/Makefile b/pgxn/neon/Makefile index 1948023472..53917d8bc4 100644 --- a/pgxn/neon/Makefile +++ b/pgxn/neon/Makefile @@ -4,6 +4,7 @@ MODULE_big = neon OBJS = \ $(WIN32RES) \ + extension_server.o \ file_cache.o \ libpagestore.o \ libpqwalproposer.o \ diff --git a/pgxn/neon/extension_server.c b/pgxn/neon/extension_server.c new file mode 100644 index 0000000000..16008200ef --- /dev/null +++ b/pgxn/neon/extension_server.c @@ -0,0 +1,104 @@ + +/*------------------------------------------------------------------------- + * + * extension_server.c + * Request compute_ctl to download extension files. + * + * IDENTIFICATION + * contrib/neon/extension_server.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" +#include "tcop/pquery.h" +#include "tcop/utility.h" +#include "access/xact.h" +#include "utils/hsearch.h" +#include "utils/memutils.h" +#include "commands/defrem.h" +#include "miscadmin.h" +#include "utils/acl.h" +#include "fmgr.h" +#include "utils/guc.h" +#include "port.h" +#include "fmgr.h" + +#include + +static int extension_server_port = 0; + +static download_extension_file_hook_type prev_download_extension_file_hook = NULL; + +// to download all SQL (and data) files for an extension: +// curl -X POST http://localhost:8080/extension_server/postgis +// it covers two possible extension files layouts: +// 1. extension_name--version--platform.sql +// 2. extension_name/extension_name--version.sql +// extension_name/extra_files.csv +// +// to download specific library file: +// curl -X POST http://localhost:8080/extension_server/postgis-3.so?is_library=true +static bool +neon_download_extension_file_http(const char *filename, bool is_library) +{ + CURL *curl; + CURLcode res; + char *compute_ctl_url; + char *postdata; + bool ret = false; + + if ((curl = curl_easy_init()) == NULL) + { + elog(ERROR, "Failed to initialize curl handle"); + } + + compute_ctl_url = psprintf("http://localhost:%d/extension_server/%s%s", + extension_server_port, filename, is_library?"?is_library=true":""); + + elog(LOG, "Sending request to compute_ctl: %s", compute_ctl_url); + + curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "POST"); + curl_easy_setopt(curl, CURLOPT_URL, compute_ctl_url); + curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3L /* seconds */); + + + if (curl) + { + /* Perform the request, res will get the return code */ + res = curl_easy_perform(curl); + /* Check for errors */ + if (res == CURLE_OK) + { + ret = true; + } + else + { + // Don't error here because postgres will try to find the file + // and will fail with some proper error message if it's not found. + elog(WARNING, "neon_download_extension_file_http failed: %s\n", curl_easy_strerror(res)); + } + + /* always cleanup */ + curl_easy_cleanup(curl); + } + + return ret; +} + +void pg_init_extension_server() +{ + // Port to connect to compute_ctl on localhost + // to request extension files. + DefineCustomIntVariable("neon.extension_server_port", + "connection string to the compute_ctl", + NULL, + &extension_server_port, + 0, 0, INT_MAX, + PGC_POSTMASTER, + 0, /* no flags required */ + NULL, NULL, NULL); + + // set download_extension_file_hook + prev_download_extension_file_hook = download_extension_file_hook; + download_extension_file_hook = neon_download_extension_file_http; +} diff --git a/pgxn/neon/extension_server.h b/pgxn/neon/extension_server.h new file mode 100644 index 0000000000..8b13789179 --- /dev/null +++ b/pgxn/neon/extension_server.h @@ -0,0 +1 @@ + diff --git a/pgxn/neon/neon.c b/pgxn/neon/neon.c index b45d7cfc32..c7211ea05a 100644 --- a/pgxn/neon/neon.c +++ b/pgxn/neon/neon.c @@ -35,8 +35,11 @@ _PG_init(void) { pg_init_libpagestore(); pg_init_walproposer(); + InitControlPlaneConnector(); + pg_init_extension_server(); + // Important: This must happen after other parts of the extension // are loaded, otherwise any settings to GUCs that were set before // the extension was loaded will be removed. diff --git a/pgxn/neon/neon.h b/pgxn/neon/neon.h index 60d321a945..2610da4311 100644 --- a/pgxn/neon/neon.h +++ b/pgxn/neon/neon.h @@ -21,6 +21,8 @@ extern char *neon_tenant; extern void pg_init_libpagestore(void); extern void pg_init_walproposer(void); +extern void pg_init_extension_server(void); + /* * Returns true if we shouldn't do REDO on that block in record indicated by * block_id; false otherwise. diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index e56bf78019..04ebd08e20 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -514,6 +514,16 @@ def available_remote_storages() -> List[RemoteStorageKind]: return remote_storages +def available_s3_storages() -> List[RemoteStorageKind]: + remote_storages = [RemoteStorageKind.MOCK_S3] + if os.getenv("ENABLE_REAL_S3_REMOTE_STORAGE") is not None: + remote_storages.append(RemoteStorageKind.REAL_S3) + log.info("Enabling real s3 storage for tests") + else: + log.info("Using mock implementations to test remote storage") + return remote_storages + + @dataclass class LocalFsStorage: root: Path @@ -534,6 +544,16 @@ class S3Storage: "AWS_SECRET_ACCESS_KEY": self.secret_key, } + def to_string(self) -> str: + return json.dumps( + { + "bucket": self.bucket_name, + "region": self.bucket_region, + "endpoint": self.endpoint, + "prefix": self.prefix_in_bucket, + } + ) + RemoteStorage = Union[LocalFsStorage, S3Storage] @@ -600,10 +620,12 @@ class NeonEnvBuilder: self.rust_log_override = rust_log_override self.port_distributor = port_distributor self.remote_storage = remote_storage + self.ext_remote_storage: Optional[S3Storage] = None + self.remote_storage_client: Optional[Any] = None self.remote_storage_users = remote_storage_users self.broker = broker self.run_id = run_id - self.mock_s3_server = mock_s3_server + self.mock_s3_server: MockS3Server = mock_s3_server self.pageserver_config_override = pageserver_config_override self.num_safekeepers = num_safekeepers self.safekeepers_id_start = safekeepers_id_start @@ -651,15 +673,24 @@ class NeonEnvBuilder: remote_storage_kind: RemoteStorageKind, test_name: str, force_enable: bool = True, + enable_remote_extensions: bool = False, ): if remote_storage_kind == RemoteStorageKind.NOOP: return elif remote_storage_kind == RemoteStorageKind.LOCAL_FS: self.enable_local_fs_remote_storage(force_enable=force_enable) elif remote_storage_kind == RemoteStorageKind.MOCK_S3: - self.enable_mock_s3_remote_storage(bucket_name=test_name, force_enable=force_enable) + self.enable_mock_s3_remote_storage( + bucket_name=test_name, + force_enable=force_enable, + enable_remote_extensions=enable_remote_extensions, + ) elif remote_storage_kind == RemoteStorageKind.REAL_S3: - self.enable_real_s3_remote_storage(test_name=test_name, force_enable=force_enable) + self.enable_real_s3_remote_storage( + test_name=test_name, + force_enable=force_enable, + enable_remote_extensions=enable_remote_extensions, + ) else: raise RuntimeError(f"Unknown storage type: {remote_storage_kind}") @@ -673,11 +704,15 @@ class NeonEnvBuilder: assert force_enable or self.remote_storage is None, "remote storage is enabled already" self.remote_storage = LocalFsStorage(Path(self.repo_dir / "local_fs_remote_storage")) - def enable_mock_s3_remote_storage(self, bucket_name: str, force_enable: bool = True): + def enable_mock_s3_remote_storage( + self, bucket_name: str, force_enable: bool = True, enable_remote_extensions: bool = False + ): """ Sets up the pageserver to use the S3 mock server, creates the bucket, if it's not present already. Starts up the mock server, if that does not run yet. Errors, if the pageserver has some remote storage configuration already, unless `force_enable` is not set to `True`. + + Also creates the bucket for extensions, self.ext_remote_storage bucket """ assert force_enable or self.remote_storage is None, "remote storage is enabled already" mock_endpoint = self.mock_s3_server.endpoint() @@ -698,9 +733,22 @@ class NeonEnvBuilder: bucket_region=mock_region, access_key=self.mock_s3_server.access_key(), secret_key=self.mock_s3_server.secret_key(), + prefix_in_bucket="pageserver", ) - def enable_real_s3_remote_storage(self, test_name: str, force_enable: bool = True): + if enable_remote_extensions: + self.ext_remote_storage = S3Storage( + bucket_name=bucket_name, + endpoint=mock_endpoint, + bucket_region=mock_region, + access_key=self.mock_s3_server.access_key(), + secret_key=self.mock_s3_server.secret_key(), + prefix_in_bucket="ext", + ) + + def enable_real_s3_remote_storage( + self, test_name: str, force_enable: bool = True, enable_remote_extensions: bool = False + ): """ Sets up configuration to use real s3 endpoint without mock server """ @@ -737,9 +785,18 @@ class NeonEnvBuilder: bucket_region=region, access_key=access_key, secret_key=secret_key, - prefix_in_bucket=self.remote_storage_prefix, + prefix_in_bucket=f"{self.remote_storage_prefix}/pageserver", ) + if enable_remote_extensions: + self.ext_remote_storage = S3Storage( + bucket_name=bucket_name, + bucket_region=region, + access_key=access_key, + secret_key=secret_key, + prefix_in_bucket=f"{self.remote_storage_prefix}/ext", + ) + def cleanup_local_storage(self): if self.preserve_database_files: return @@ -773,6 +830,7 @@ class NeonEnvBuilder: # `self.remote_storage_prefix` is coupled with `S3Storage` storage type, # so this line effectively a no-op assert isinstance(self.remote_storage, S3Storage) + assert self.remote_storage_client is not None if self.keep_remote_storage_contents: log.info("keep_remote_storage_contents skipping remote storage cleanup") @@ -902,6 +960,8 @@ class NeonEnv: self.neon_binpath = config.neon_binpath self.pg_distrib_dir = config.pg_distrib_dir self.endpoint_counter = 0 + self.remote_storage_client = config.remote_storage_client + self.ext_remote_storage = config.ext_remote_storage # generate initial tenant ID here instead of letting 'neon init' generate it, # so that we don't need to dig it out of the config file afterwards. @@ -1488,6 +1548,7 @@ class NeonCli(AbstractNeonCli): safekeepers: Optional[List[int]] = None, tenant_id: Optional[TenantId] = None, lsn: Optional[Lsn] = None, + remote_ext_config: Optional[str] = None, ) -> "subprocess.CompletedProcess[str]": args = [ "endpoint", @@ -1497,6 +1558,8 @@ class NeonCli(AbstractNeonCli): "--pg-version", self.env.pg_version, ] + if remote_ext_config is not None: + args.extend(["--remote-ext-config", remote_ext_config]) if lsn is not None: args.append(f"--lsn={lsn}") args.extend(["--pg-port", str(pg_port)]) @@ -2358,7 +2421,7 @@ class Endpoint(PgProtocol): return self - def start(self) -> "Endpoint": + def start(self, remote_ext_config: Optional[str] = None) -> "Endpoint": """ Start the Postgres instance. Returns self. @@ -2374,6 +2437,7 @@ class Endpoint(PgProtocol): http_port=self.http_port, tenant_id=self.tenant_id, safekeepers=self.active_safekeepers, + remote_ext_config=remote_ext_config, ) self.running = True @@ -2463,6 +2527,7 @@ class Endpoint(PgProtocol): hot_standby: bool = False, lsn: Optional[Lsn] = None, config_lines: Optional[List[str]] = None, + remote_ext_config: Optional[str] = None, ) -> "Endpoint": """ Create an endpoint, apply config, and start Postgres. @@ -2477,7 +2542,7 @@ class Endpoint(PgProtocol): config_lines=config_lines, hot_standby=hot_standby, lsn=lsn, - ).start() + ).start(remote_ext_config=remote_ext_config) log.info(f"Postgres startup took {time.time() - started_at} seconds") @@ -2511,6 +2576,7 @@ class EndpointFactory: lsn: Optional[Lsn] = None, hot_standby: bool = False, config_lines: Optional[List[str]] = None, + remote_ext_config: Optional[str] = None, ) -> Endpoint: ep = Endpoint( self.env, @@ -2527,6 +2593,7 @@ class EndpointFactory: hot_standby=hot_standby, config_lines=config_lines, lsn=lsn, + remote_ext_config=remote_ext_config, ) def create( diff --git a/test_runner/fixtures/types.py b/test_runner/fixtures/types.py index 7d179cc7fb..ef88e09de4 100644 --- a/test_runner/fixtures/types.py +++ b/test_runner/fixtures/types.py @@ -89,6 +89,9 @@ class TenantId(Id): def __repr__(self) -> str: return f'`TenantId("{self.id.hex()}")' + def __str__(self) -> str: + return self.id.hex() + class TimelineId(Id): def __repr__(self) -> str: diff --git a/test_runner/regress/test_download_extensions.py b/test_runner/regress/test_download_extensions.py new file mode 100644 index 0000000000..eda4f5f593 --- /dev/null +++ b/test_runner/regress/test_download_extensions.py @@ -0,0 +1,247 @@ +import os +from contextlib import closing +from io import BytesIO + +import pytest +from fixtures.log_helper import log +from fixtures.neon_fixtures import ( + NeonEnvBuilder, + PgBin, + RemoteStorageKind, + available_s3_storages, +) +from fixtures.pg_version import PgVersion +from fixtures.types import TenantId + +NUM_EXT = 3 + + +def control_file_content(owner, i): + output = f"""# mock {owner} extension{i} +comment = 'This is a mock extension' +default_version = '1.0' +module_pathname = '$libdir/test_ext{i}' +relocatable = true""" + return output + + +def sql_file_content(): + output = """ + CREATE FUNCTION test_ext_add(integer, integer) RETURNS integer + AS 'select $1 + $2;' + LANGUAGE SQL + IMMUTABLE + RETURNS NULL ON NULL INPUT; + """ + return output + + +# Prepare some mock extension files and upload them to the bucket +# returns a list of files that should be cleaned up after the test +def prepare_mock_ext_storage( + pg_version: PgVersion, + tenant_id: TenantId, + pg_bin: PgBin, + ext_remote_storage, + remote_storage_client, +): + bucket_prefix = ext_remote_storage.prefix_in_bucket + custom_prefix = str(tenant_id) + PUB_EXT_ROOT = f"v{pg_version}/share/postgresql/extension" + PRIVATE_EXT_ROOT = f"v{pg_version}/{custom_prefix}/share/postgresql/extension" + LOCAL_EXT_ROOT = f"pg_install/{PUB_EXT_ROOT}" + + PUB_LIB_ROOT = f"v{pg_version}/lib" + PRIVATE_LIB_ROOT = f"v{pg_version}/{custom_prefix}/lib" + LOCAL_LIB_ROOT = f"{pg_bin.pg_lib_dir}/postgresql" + + log.info( + f""" + PUB_EXT_ROOT: {PUB_EXT_ROOT} + PRIVATE_EXT_ROOT: {PRIVATE_EXT_ROOT} + LOCAL_EXT_ROOT: {LOCAL_EXT_ROOT} + PUB_LIB_ROOT: {PUB_LIB_ROOT} + PRIVATE_LIB_ROOT: {PRIVATE_LIB_ROOT} + LOCAL_LIB_ROOT: {LOCAL_LIB_ROOT} + """ + ) + + cleanup_files = [] + + # Upload several test_ext{i}.control files to the bucket + for i in range(NUM_EXT): + public_ext = BytesIO(bytes(control_file_content("public", i), "utf-8")) + public_remote_name = f"{bucket_prefix}/{PUB_EXT_ROOT}/test_ext{i}.control" + public_local_name = f"{LOCAL_EXT_ROOT}/test_ext{i}.control" + custom_ext = BytesIO(bytes(control_file_content(str(tenant_id), i), "utf-8")) + custom_remote_name = f"{bucket_prefix}/{PRIVATE_EXT_ROOT}/custom_ext{i}.control" + custom_local_name = f"{LOCAL_EXT_ROOT}/custom_ext{i}.control" + cleanup_files += [public_local_name, custom_local_name] + + remote_storage_client.upload_fileobj( + public_ext, ext_remote_storage.bucket_name, public_remote_name + ) + remote_storage_client.upload_fileobj( + custom_ext, ext_remote_storage.bucket_name, custom_remote_name + ) + + # Upload SQL file for the extension we're going to create + sql_filename = "test_ext0--1.0.sql" + test_sql_public_remote_path = f"{bucket_prefix}/{PUB_EXT_ROOT}/{sql_filename}" + test_sql_local_path = f"{LOCAL_EXT_ROOT}/{sql_filename}" + test_ext_sql_file = BytesIO(bytes(sql_file_content(), "utf-8")) + remote_storage_client.upload_fileobj( + test_ext_sql_file, + ext_remote_storage.bucket_name, + test_sql_public_remote_path, + ) + cleanup_files += [test_sql_local_path] + + # upload some fake library files + for i in range(2): + public_library = BytesIO(bytes("\n111\n", "utf-8")) + public_remote_name = f"{bucket_prefix}/{PUB_LIB_ROOT}/test_lib{i}.so" + public_local_name = f"{LOCAL_LIB_ROOT}/test_lib{i}.so" + custom_library = BytesIO(bytes("\n111\n", "utf-8")) + custom_remote_name = f"{bucket_prefix}/{PRIVATE_LIB_ROOT}/custom_lib{i}.so" + custom_local_name = f"{LOCAL_LIB_ROOT}/custom_lib{i}.so" + + log.info(f"uploading library to {public_remote_name}") + log.info(f"uploading library to {custom_remote_name}") + + remote_storage_client.upload_fileobj( + public_library, + ext_remote_storage.bucket_name, + public_remote_name, + ) + remote_storage_client.upload_fileobj( + custom_library, + ext_remote_storage.bucket_name, + custom_remote_name, + ) + cleanup_files += [public_local_name, custom_local_name] + + return cleanup_files + + +# Generate mock extension files and upload them to the bucket. +# +# Then check that compute nodes can download them and use them +# to CREATE EXTENSION and LOAD 'library.so' +# +# NOTE: You must have appropriate AWS credentials to run REAL_S3 test. +# It may also be necessary to set the following environment variables: +# export AWS_ACCESS_KEY_ID='test' +# export AWS_SECRET_ACCESS_KEY='test' +# export AWS_SECURITY_TOKEN='test' +# export AWS_SESSION_TOKEN='test' +# export AWS_DEFAULT_REGION='us-east-1' +# +@pytest.mark.parametrize("remote_storage_kind", available_s3_storages()) +def test_remote_extensions( + neon_env_builder: NeonEnvBuilder, + remote_storage_kind: RemoteStorageKind, + pg_version: PgVersion, + pg_bin: PgBin, +): + neon_env_builder.enable_remote_storage( + remote_storage_kind=remote_storage_kind, + test_name="test_remote_extensions", + enable_remote_extensions=True, + ) + neon_env_builder.num_safekeepers = 3 + env = neon_env_builder.init_start() + tenant_id, _ = env.neon_cli.create_tenant() + env.neon_cli.create_timeline("test_remote_extensions", tenant_id=tenant_id) + + assert env.ext_remote_storage is not None + assert env.remote_storage_client is not None + + # Prepare some mock extension files and upload them to the bucket + cleanup_files = prepare_mock_ext_storage( + pg_version, + tenant_id, + pg_bin, + env.ext_remote_storage, + env.remote_storage_client, + ) + # Start a compute node and check that it can download the extensions + # and use them to CREATE EXTENSION and LOAD 'library.so' + # + # This block is wrapped in a try/finally so that the downloaded files + # are cleaned up even if the test fails + try: + endpoint = env.endpoints.create_start( + "test_remote_extensions", + tenant_id=tenant_id, + remote_ext_config=env.ext_remote_storage.to_string(), + # config_lines=["log_min_messages=debug3"], + ) + with closing(endpoint.connect()) as conn: + with conn.cursor() as cur: + # Test query: check that test_ext0 was successfully downloaded + cur.execute("SELECT * FROM pg_available_extensions") + all_extensions = [x[0] for x in cur.fetchall()] + log.info(all_extensions) + for i in range(NUM_EXT): + assert f"test_ext{i}" in all_extensions + assert f"custom_ext{i}" in all_extensions + + cur.execute("CREATE EXTENSION test_ext0") + cur.execute("SELECT extname FROM pg_extension") + all_extensions = [x[0] for x in cur.fetchall()] + log.info(all_extensions) + assert "test_ext0" in all_extensions + + # Try to load existing library file + try: + cur.execute("LOAD 'test_lib0.so'") + except Exception as e: + # expected to fail with + # could not load library ... test_ext.so: file too short + # because test_lib0.so is not real library file + log.info("LOAD test_lib0.so failed (expectedly): %s", e) + assert "file too short" in str(e) + + # Try to load custom library file + try: + cur.execute("LOAD 'custom_lib0.so'") + except Exception as e: + # expected to fail with + # could not load library ... test_ext.so: file too short + # because test_lib0.so is not real library file + log.info("LOAD custom_lib0.so failed (expectedly): %s", e) + assert "file too short" in str(e) + + # Try to load existing library file without .so extension + try: + cur.execute("LOAD 'test_lib1'") + except Exception as e: + # expected to fail with + # could not load library ... test_lib1.so: file too short + # because test_lib1.so is not real library file + log.info("LOAD test_lib1 failed (expectedly): %s", e) + assert "file too short" in str(e) + + # Try to load non-existent library file + try: + cur.execute("LOAD 'test_lib_fail.so'") + except Exception as e: + # expected to fail because test_lib_fail.so is not found + log.info("LOAD test_lib_fail.so failed (expectedly): %s", e) + assert ( + """could not access file "test_lib_fail.so": No such file or directory""" + in str(e) + ) + + finally: + # this is important because if the files aren't cleaned up then the test can + # pass even without successfully downloading the files if a previous run (or + # run with different type of remote storage) of the test did download the + # files + for file in cleanup_files: + try: + os.remove(file) + log.info(f"Deleted {file}") + except FileNotFoundError: + log.info(f"{file} does not exist, so cannot be deleted") diff --git a/test_runner/regress/test_timeline_delete.py b/test_runner/regress/test_timeline_delete.py index 7c3424cf32..458bd8dc19 100644 --- a/test_runner/regress/test_timeline_delete.py +++ b/test_runner/regress/test_timeline_delete.py @@ -275,6 +275,7 @@ def assert_prefix_empty(neon_env_builder: NeonEnvBuilder, prefix: Optional[str] assert isinstance(neon_env_builder.remote_storage, S3Storage) # Note that this doesnt use pagination, so list is not guaranteed to be exhaustive. + assert neon_env_builder.remote_storage_client is not None response = neon_env_builder.remote_storage_client.list_objects_v2( Bucket=neon_env_builder.remote_storage.bucket_name, Prefix=prefix or neon_env_builder.remote_storage.prefix_in_bucket or "", @@ -628,7 +629,7 @@ def test_timeline_delete_works_for_remote_smoke( ) # for some reason the check above doesnt immediately take effect for the below. - # Assume it is mock server incosistency and check twice. + # Assume it is mock server inconsistency and check twice. wait_until( 2, 0.5, diff --git a/vendor/postgres-v14 b/vendor/postgres-v14 index a2daebc6b4..93a5ee7749 160000 --- a/vendor/postgres-v14 +++ b/vendor/postgres-v14 @@ -1 +1 @@ -Subproject commit a2daebc6b445dcbcca9c18e1711f47c1db7ffb04 +Subproject commit 93a5ee7749f109ecb9e5481be485c8cb17fe72ce diff --git a/vendor/postgres-v15 b/vendor/postgres-v15 index 2df2ce3744..293a06e5e1 160000 --- a/vendor/postgres-v15 +++ b/vendor/postgres-v15 @@ -1 +1 @@ -Subproject commit 2df2ce374464a7449e15dfa46c956b73b4f4098b +Subproject commit 293a06e5e14ed9be3f5002c63b4fac391491ec17