diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 148a970c16..293e9725f6 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -198,7 +198,7 @@ fn main() -> Result<()> { state: Mutex::new(new_state), state_changed: Condvar::new(), ext_remote_storage, - available_libraries: OnceLock::new(), + // available_libraries: OnceLock::new(), available_extensions: OnceLock::new(), }; let compute = Arc::new(compute_node); diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 188544536c..1fe13d5c3a 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -20,7 +20,6 @@ use compute_api::spec::{ComputeMode, ComputeSpec}; use remote_storage::{GenericRemoteStorage, RemotePath}; -use crate::extension_server::PathAndFlag; use crate::pg_helpers::*; use crate::spec::*; use crate::{config, extension_server}; @@ -54,8 +53,8 @@ pub struct ComputeNode { /// the S3 bucket that we search for extensions in pub ext_remote_storage: Option, // cached lists of available extensions and libraries - pub available_libraries: OnceLock>>, - pub available_extensions: OnceLock>>, + // pub available_libraries: OnceLock>>, + pub available_extensions: OnceLock>, } #[derive(Clone, Debug)] @@ -534,16 +533,15 @@ impl ComputeNode { // This part is sync, because we need to download // remote shared_preload_libraries before postgres start (if any) - let library_load_start_time = Utc::now(); { - self.prepare_extenal_libraries(&compute_state)?; + let library_load_start_time = Utc::now(); + self.prepare_preload_libraries(&compute_state)?; let library_load_time = Utc::now() .signed_duration_since(library_load_start_time) .to_std() .unwrap() .as_millis() as u64; - let mut state = self.state.lock().unwrap(); state.metrics.load_libraries_ms = library_load_time; info!( @@ -689,86 +687,6 @@ LIMIT 100", } } - // If remote extension storage is configured, - // download shared preload libraries. - #[tokio::main] - pub async fn prepare_extenal_libraries(&self, compute_state: &ComputeState) -> Result<()> { - if let Some(ref ext_remote_storage) = self.ext_remote_storage { - let pspec = compute_state.pspec.as_ref().expect("spec must be set"); - // download preload shared libraries before postgres start (if any) - let spec = &pspec.spec; - - // 1. parse custom extension paths from spec - let custom_ext_prefixes = match &spec.custom_extensions { - Some(custom_extensions) => custom_extensions.clone(), - None => Vec::new(), - }; - - info!("custom_ext_prefixes: {:?}", &custom_ext_prefixes); - - // parse shared_preload_libraries from spec - let mut libs_vec = Vec::new(); - - if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") { - libs_vec = libs - .split(&[',', '\'', ' ']) - .filter(|s| *s != "neon" && !s.is_empty()) - .map(str::to_string) - .collect(); - } - - info!( - "shared_preload_libraries parsed from spec.cluster.settings: {:?}", - libs_vec - ); - - // also parse shared_preload_libraries from provided postgresql.conf - // that is used in neon_local and python tests - if let Some(conf) = &spec.cluster.postgresql_conf { - let conf_lines = conf.split('\n').collect::>(); - - let mut shared_preload_libraries_line = ""; - for line in conf_lines { - if line.starts_with("shared_preload_libraries") { - shared_preload_libraries_line = line; - } - } - - let mut preload_libs_vec = Vec::new(); - if let Some(libs) = shared_preload_libraries_line.split("='").nth(1) { - preload_libs_vec = libs - .split(&[',', '\'', ' ']) - .filter(|s| *s != "neon" && !s.is_empty()) - .map(str::to_string) - .collect(); - } - - info!( - "shared_preload_libraries parsed from spec.cluster.postgresql_conf: {:?}", - preload_libs_vec - ); - - libs_vec.extend(preload_libs_vec); - } - - info!("Libraries to download: {:?}", &libs_vec); - // download shared_preload_libraries - let available_libraries = extension_server::get_available_libraries( - ext_remote_storage, - &self.pgbin, - &self.pgversion, - &custom_ext_prefixes, - &libs_vec, - ) - .await?; - - self.available_libraries - .set(available_libraries) - .expect("available_libraries.set error"); - } - Ok(()) - } - // If remote extension storage is configured, // download extension control files #[tokio::main] @@ -801,37 +719,29 @@ LIMIT 100", Ok(()) } - pub async fn download_extension_files(&self, filename: String) -> Result<()> { + pub async fn download_extension(&self, ext_name: &str) -> Result<()> { match &self.ext_remote_storage { None => anyhow::bail!("No remote extension storage"), Some(remote_storage) => { - extension_server::download_extension_files( - &filename, - remote_storage, - &self.pgbin, + extension_server::download_extension( + ext_name, self.available_extensions .get() - .context("available_extensions broke")?, + .context("extension download error")? + .get(ext_name) + .context("cannot find extension")?, + remote_storage, + &self.pgbin, ) .await } } } - pub async fn download_library_file(&self, filename: String) -> Result<()> { - match &self.ext_remote_storage { - None => anyhow::bail!("No remote extension storage"), - Some(remote_storage) => { - extension_server::download_library_file( - &filename, - remote_storage, - &self.pgbin, - self.available_libraries - .get() - .context("available_libraries broke")?, - ) - .await - } - } + #[tokio::main] + pub async fn prepare_preload_libraries(&self, compute_state: &ComputeState) -> Result<()> { + // TODO: revive some of the old logic for downloading shared preload libaries + info!("ERRRRRORRRR"); + Ok(()) } } diff --git a/compute_tools/src/extension_server.rs b/compute_tools/src/extension_server.rs index 6cb62f8b6d..0ea6f52fd3 100644 --- a/compute_tools/src/extension_server.rs +++ b/compute_tools/src/extension_server.rs @@ -1,30 +1,20 @@ // Download extension files from the extension store // and put them in the right place in the postgres directory use crate::compute::ComputeNode; -use anyhow::{self, bail, Context, Result}; -use futures::future::{join_all, Remote}; +use anyhow::{self, bail, Result}; use remote_storage::*; use serde_json::{self, Value}; use std::collections::HashMap; use std::fs::File; -use std::io::{BufWriter, Write}; +use std::io::BufWriter; +use std::io::Write; use std::num::{NonZeroU32, NonZeroUsize}; -use std::path::{Path, PathBuf}; +use std::path::Path; use std::str; use std::sync::Arc; use std::thread; use tokio::io::AsyncReadExt; -use tracing::info; - -// remote! -const SHARE_EXT_PATH: &str = "share/extension"; - -fn pass_any_error(results: Vec>) -> Result<()> { - for result in results { - result?; - } - Ok(()) -} +use tracing::{info, warn}; fn get_pg_config(argument: &str, pgbin: &str) -> String { // gives the result of `pg_config [argument]` @@ -52,233 +42,93 @@ pub fn get_pg_version(pgbin: &str) -> String { panic!("Unsuported postgres version {human_version}"); } -async fn download_helper( - remote_storage: &GenericRemoteStorage, - remote_from_path: RemotePath, - sub_directory: Option<&str>, - download_location: &Path, -) -> anyhow::Result<()> { - // downloads file at remote_from_path to - // `download_location/[optional: subdirectory]/[remote_storage.object_name()]` - // Note: the subdirectory commmand is needed when there is an extension that - // depends on files in a subdirectory. - // For example, v14/share/extension/some_ext.control - // might depend on v14/share/extension/some_ext/some_ext--1.1.0.sql - // and v14/share/extension/some_ext/xxx.csv - // Note: it is the caller's responsibility to create the appropriate subdirectory - - let local_path = match sub_directory { - Some(subdir) => download_location - .join(subdir) - .join(remote_from_path.object_name().expect("bad object")), - None => download_location.join(remote_from_path.object_name().expect("bad object")), - }; - if local_path.exists() { - info!("File {:?} already exists. Skipping download", &local_path); - return Ok(()); - } - info!( - "Downloading {:?} to location {:?}", - &remote_from_path, &local_path - ); - let mut download = remote_storage.download(&remote_from_path).await?; - let mut write_data_buffer = Vec::new(); - download - .download_stream - .read_to_end(&mut write_data_buffer) - .await?; - let mut output_file = BufWriter::new(File::create(local_path)?); - output_file.write_all(&write_data_buffer)?; - info!("Download {:?} completed successfully", &remote_from_path); - Ok(()) -} - // download extension control files -// // if custom_ext_prefixes is provided - search also in custom extension paths -// pub async fn get_available_extensions( remote_storage: &GenericRemoteStorage, pgbin: &str, pg_version: &str, custom_ext_prefixes: &Vec, -) -> anyhow::Result> { +) -> Result> { let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension"); - // public path, plus any private paths to download extensions from - let mut paths: Vec = Vec::new(); - paths.push(RemotePath::new(&Path::new(pg_version).join("public"))?); - for custom_prefix in custom_ext_prefixes { - paths.push(RemotePath::new(&Path::new(pg_version).join(custom_prefix))?); - } - let (control_files, zip_files) = organized_extension_files(remote_storage, &paths).await?; - let mut control_file_download_tasks = Vec::new(); - // download all control files - for control_file in control_files { - control_file_download_tasks.push(download_helper( - remote_storage, - control_file.clone(), - None, - &local_sharedir, - )); - } - pass_any_error(join_all(control_file_download_tasks).await)?; - Ok(zip_files) -} + let index_path = RemotePath::new(Path::new(&format!("{:?}/ext_index.json", pg_version))) + .expect("error forming path"); + let mut download = remote_storage.download(&index_path).await?; + let mut write_data_buffer = Vec::new(); + download + .download_stream + .read_to_end(&mut write_data_buffer) + .await?; + let ext_index_str = + serde_json::to_string(&write_data_buffer).expect("Failed to convert to JSON"); + let ext_index_full = match serde_json::from_str(&ext_index_str) { + Ok(Value::Object(map)) => map, + _ => bail!("error parsing json"), + }; -pub async fn download_extensions( - remote_storage: &GenericRemoteStorage, - pgbin: &str, - pg_version: &str, - custom_ext_prefixes: &Vec, -) { - // OK I was just going to download everything, but that seems wrong. -} + let mut prefixes = vec!["public".to_string()]; + prefixes.extend(custom_ext_prefixes.clone()); + let mut ext_index_limited = HashMap::new(); + for prefix in prefixes { + let ext_details_str = ext_index_full.get(&prefix); + if let Some(ext_details_str) = ext_details_str { + let ext_details = + serde_json::to_string(ext_details_str).expect("Failed to convert to JSON"); + let ext_details = match serde_json::from_str(&ext_details) { + Ok(Value::Object(map)) => map, + _ => bail!("error parsing json"), + }; + let control_contents = match ext_details.get("control").expect("broken json file") { + Value::String(s) => s, + _ => bail!("broken json file"), + }; + let path = RemotePath::new(Path::new(&format!( + "{:?}/{:?}", + pg_version, + ext_details.get("path") + ))) + .expect("error forming path"); -// Download requested shared_preload_libraries -// -// Note that tenant_id is not optional here, because we only download libraries -// after we know the tenant spec and the tenant_id. -// -// return list of all library files to use it in the future searches -pub async fn get_available_libraries( - remote_storage: &GenericRemoteStorage, - pgbin: &str, - pg_version: &str, - custom_ext_prefixes: &Vec, - preload_libraries: &Vec, -) -> anyhow::Result>> { - // Construct a hashmap of all available libraries - // example (key, value) pair: test_lib0: [RemotePath(v14/lib/test_lib0.so), RemotePath(v14/lib/test_lib0.so.3)] - let mut paths: Vec = Vec::new(); - // public libraries - paths.push( - RemotePath::new(&Path::new(&pg_version).join("lib/")) - .expect("The hard coded path here is valid"), - ); - // custom libraries - for custom_prefix in custom_ext_prefixes { - paths.push( - RemotePath::new(&Path::new(&pg_version).join(custom_prefix).join("lib")) - .expect("The hard coded path here is valid"), - ); - } - let all_available_libraries = organized_library_files(remote_storage, &paths).await?; + let control_path = format!("{:?}/{:?}.control", &local_sharedir, &prefix); + std::fs::write(control_path, &control_contents)?; - info!("list of library files {:?}", &all_available_libraries); - // download all requested libraries - let mut download_tasks = Vec::new(); - for lib_name in preload_libraries { - download_tasks.push(download_library_file( - lib_name, - remote_storage, - pgbin, - &all_available_libraries, - )); + ext_index_limited.insert(prefix, path); + } else { + warn!("BAD PREFIX {:?}", prefix); + } } - pass_any_error(join_all(download_tasks).await)?; - Ok(all_available_libraries) + Ok(ext_index_limited) } // download all sqlfiles (and possibly data files) for a given extension name // -pub async fn download_extension_files( +pub async fn download_extension( ext_name: &str, + ext_path: &RemotePath, remote_storage: &GenericRemoteStorage, pgbin: &str, - all_available_files: &HashMap>, ) -> Result<()> { let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension"); - let mut downloaded_something = false; - let mut made_subdir = false; + let local_libdir = Path::new(&get_pg_config("--libdir", pgbin)).to_owned(); + info!("Start downloading extension {:?}", ext_name); + let mut download = remote_storage.download(&ext_path).await?; + let mut write_data_buffer = Vec::new(); + download + .download_stream + .read_to_end(&mut write_data_buffer) + .await?; + let zip_name = ext_path.object_name().expect("invalid extension path"); + let mut output_file = BufWriter::new(File::create(zip_name)?); + output_file.write_all(&write_data_buffer)?; + info!("Download {:?} completed successfully", &ext_path); + info!("Unzipping extension {:?}", zip_name); - info!("EXTENSION {:?}", ext_name); - info!("{:?}", all_available_files.get(ext_name)); + // TODO unzip and place files in appropriate locations + info!("unzip {zip_name:?}"); + info!("place extension files in {local_sharedir:?}"); + info!("place library files in {local_libdir:?}"); - info!("start download"); - let mut download_tasks = Vec::new(); - if let Some(files) = all_available_files.get(ext_name) { - info!("Downloading files for extension {:?}", &ext_name); - for path_and_flag in files { - let file = &path_and_flag.path; - let subdir_flag = path_and_flag.subdir_flag; - info!( - "--- Downloading {:?} (for {:?} as subdir? = {:?})", - &file, &ext_name, subdir_flag - ); - let mut subdir = None; - if subdir_flag { - subdir = Some(ext_name); - if !made_subdir { - made_subdir = true; - std::fs::create_dir_all(local_sharedir.join(ext_name))?; - } - } - download_tasks.push(download_helper( - remote_storage, - file.clone(), - subdir, - &local_sharedir, - )); - downloaded_something = true; - } - } - if !downloaded_something { - bail!("Files for extension {ext_name} are not found in the extension store"); - } - pass_any_error(join_all(download_tasks).await)?; - info!("finish download"); - Ok(()) -} - -// appends an .so suffix to libname if it does not already have one -fn enforce_so_end(libname: &str) -> String { - if !libname.contains(".so") { - format!("{}.so", libname) - } else { - libname.to_string() - } -} - -// download shared library file -pub async fn download_library_file( - lib_name: &str, - remote_storage: &GenericRemoteStorage, - pgbin: &str, - all_available_libraries: &HashMap>, -) -> Result<()> { - let lib_name = get_library_name(lib_name); - let local_libdir: PathBuf = Path::new(&get_pg_config("--pkglibdir", pgbin)).into(); - info!("looking for library {:?}", &lib_name); - match all_available_libraries.get(&*lib_name) { - Some(remote_paths) => { - let mut library_download_tasks = Vec::new(); - for remote_path in remote_paths { - let file_path = local_libdir.join(remote_path.object_name().expect("bad object")); - if file_path.exists() { - info!("File {:?} already exists. Skipping download", &file_path); - } else { - library_download_tasks.push(download_helper( - remote_storage, - remote_path.clone(), - None, - &local_libdir, - )); - } - } - pass_any_error(join_all(library_download_tasks).await)?; - } - None => { - // minor TODO: this logic seems to be somewhat faulty for .so.3 type files? - let lib_name_with_ext = enforce_so_end(&lib_name); - let file_path = local_libdir.join(lib_name_with_ext); - if file_path.exists() { - info!("File {:?} already exists. Skipping download", &file_path); - } else { - bail!("Library file {lib_name} not found") - } - } - } Ok(()) } @@ -307,7 +157,8 @@ pub fn init_remote_storage( _ => Some(default_prefix.to_string()), }; - // load will not be large, so default parameters are fine + // TODO: is this a valid assumption? some extensions are quite large + // load should not be large, so default parameters are fine let config = S3Config { bucket_name: remote_ext_bucket.to_string(), bucket_region: remote_ext_region.to_string(), @@ -324,103 +175,6 @@ pub fn init_remote_storage( GenericRemoteStorage::from_config(&config) } -fn get_library_name(path: &str) -> String { - let path_suffix: Vec<&str> = path.split('/').collect(); - let path_suffix = path_suffix.last().expect("bad ext name").to_string(); - if let Some(index) = path_suffix.find(".so") { - return path_suffix[..index].to_string(); - } - path_suffix -} - -// asyncrounously lists files in all necessary directories -// TODO: potential optimization: do a single list files on the entire bucket -// and then filter out the files we don't need -async fn list_all_files( - remote_storage: &GenericRemoteStorage, - paths: &Vec, -) -> Result> { - let mut list_tasks = Vec::new(); - let mut all_files = Vec::new(); - for path in paths { - list_tasks.push(remote_storage.list_files(Some(path))); - } - for list_result in join_all(list_tasks).await { - all_files.extend(list_result?); - } - Ok(all_files) -} - -// helper to collect all libraries, grouped by library name -// Returns a hashmap of (library name: [paths]}) -// example entry: {libpgtypes: [libpgtypes.so.3, libpgtypes.so]} -async fn organized_library_files( - remote_storage: &GenericRemoteStorage, - paths: &Vec, -) -> Result>> { - let mut library_groups = HashMap::new(); - for file in list_all_files(remote_storage, paths).await? { - let lib_name = get_library_name(file.get_path().to_str().context("invalid path")?); - let lib_list = library_groups.entry(lib_name).or_insert(Vec::new()); - lib_list.push(file.to_owned()); - } - Ok(library_groups) -} - -// store a path, paired with a flag indicating whether the path is to a file in -// the root or subdirectory -#[derive(Debug)] -pub struct PathAndFlag { - path: RemotePath, - subdir_flag: bool, -} - -// get_ext_name extracts the extension name, and returns a flag indicating -// whether this file is in a subdirectory or not. -// -// extension files can be in subdirectories of the extension store. -// examples of layout: -// v14//share//extension/extension_name--1.0.sql, -// v14//share//extension/extension_name/extension_name--1.0.sql, -// v14//share//extension/extension_name/extra_data.csv -// Note: we *assume* that the extension files is in one of these formats. -// If it is not, this code's behavior is *undefined*. -fn get_ext_name(path: &str) -> Result<(&str, bool)> { - let path_suffix: Vec<&str> = path.split(&format!("{SHARE_EXT_PATH}/")).collect(); - let ext_name = path_suffix.last().expect("bad ext name"); - - if let Some(index) = ext_name.find('/') { - return Ok((&ext_name[..index], true)); - } else if let Some(index) = ext_name.find("--") { - return Ok((&ext_name[..index], false)); - } - Ok((ext_name, false)) -} - -// list files in all given directories and find the zip / control file in them -async fn organized_extension_files( - remote_storage: &GenericRemoteStorage, - paths: &Vec, -) -> Result<(Vec, Vec)> { - let mut control_files = Vec::new(); - let mut zip_files = Vec::new(); - - let mut list_file_tasks = Vec::new(); - for path in paths { - list_file_tasks.push(remote_storage.list_files(Some(path))); - } - for list_result in join_all(list_file_tasks).await { - for file in list_result? { - if file.extension().expect("bad file name") == "control" { - control_files.push(file.to_owned()); - } else { - zip_files.push(file.to_owned()); - } - } - } - Ok((control_files, zip_files)) -} - pub fn launch_download_extensions( compute: &Arc, ) -> Result, std::io::Error> { diff --git a/compute_tools/src/http/api.rs b/compute_tools/src/http/api.rs index 0fbb334199..8fd40cdfe1 100644 --- a/compute_tools/src/http/api.rs +++ b/compute_tools/src/http/api.rs @@ -125,47 +125,19 @@ async fn routes(req: Request, compute: &Arc) -> Response { info!("serving {:?} POST request", route); info!("req.uri {:?}", req.uri()); - - let mut is_library = false; - - if let Some(params) = req.uri().query() { - info!("serving {:?} POST request with params: {}", route, params); - - if params == "is_library=true" { - is_library = true; - } else { - let mut resp = Response::new(Body::from("Wrong request parameters")); - *resp.status_mut() = StatusCode::BAD_REQUEST; - return resp; - } - } - let filename = route.split('/').last().unwrap().to_string(); - info!( - "serving /extension_server POST request, filename: {:?} is_library: {}", - filename, is_library + "serving /extension_server POST request, filename: {:?}", + &filename ); - if is_library { - match compute.download_library_file(filename.to_string()).await { - Ok(_) => Response::new(Body::from("OK")), - Err(e) => { - error!("library download failed: {}", e); - let mut resp = Response::new(Body::from(e.to_string())); - *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; - resp - } - } - } else { - match compute.download_extension_files(filename.to_string()).await { - Ok(_) => Response::new(Body::from("OK")), - Err(e) => { - error!("extension download failed: {}", e); - let mut resp = Response::new(Body::from(e.to_string())); - *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; - resp - } + match compute.download_extension(&filename).await { + Ok(_) => Response::new(Body::from("OK")), + Err(e) => { + error!("extension download failed: {}", e); + let mut resp = Response::new(Body::from(e.to_string())); + *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; + resp } } } diff --git a/test_runner/regress/test_download_extensions.py b/test_runner/regress/test_download_extensions.py index c3eedc6bfa..4e298567e1 100644 --- a/test_runner/regress/test_download_extensions.py +++ b/test_runner/regress/test_download_extensions.py @@ -1,17 +1,14 @@ -import json from contextlib import closing import pytest from fixtures.log_helper import log from fixtures.neon_fixtures import ( NeonEnvBuilder, - PgBin, RemoteStorageKind, available_s3_storages, ) from fixtures.pg_version import PgVersion - # Generate mock extension files and upload them to the mock bucket. # # NOTE: You must have appropriate AWS credentials to run REAL_S3 test.