From a6c9a4abe7f3e8257521c4a1494e6fa8e989cb0c Mon Sep 17 00:00:00 2001 From: Alek Westover Date: Wed, 28 Jun 2023 14:51:55 -0400 Subject: [PATCH] cache extensions --- compute_tools/src/bin/compute_ctl.rs | 1 + compute_tools/src/compute.rs | 9 +- compute_tools/src/extension_server.rs | 135 +++++++++++++------------- 3 files changed, 71 insertions(+), 74 deletions(-) diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 96694e5360..2719f5d1b4 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -193,6 +193,7 @@ fn main() -> Result<()> { state_changed: Condvar::new(), ext_remote_storage, available_libraries: Mutex::new(HashMap::new()), + available_extensions: Mutex::new(HashMap::new()), }; let compute = Arc::new(compute_node); diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index d421b08cee..ea19b39b68 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -53,6 +53,7 @@ pub struct ComputeNode { /// S3 extensions configuration variables pub ext_remote_storage: Option, pub available_libraries: Mutex>, + pub available_extensions: Mutex>>, } #[derive(Clone, Debug)] @@ -733,7 +734,8 @@ LIMIT 100", // download extension control files & shared_preload_libraries - extension_server::get_available_extensions( + let mut available_extensions_lock = self.available_extensions.lock().unwrap(); + *available_extensions_lock = extension_server::get_available_extensions( ext_remote_storage, &self.pgbin, &self.pgversion, @@ -769,13 +771,12 @@ LIMIT 100", }; info!("private_ext_prefixes: {:?}", &private_ext_prefixes); - + let available_extensions_lock = self.available_extensions.lock().unwrap().clone(); extension_server::download_extension_sql_files( &filename, remote_storage, &self.pgbin, - &self.pgversion, - &private_ext_prefixes, + &available_extensions_lock, ) .await } diff --git a/compute_tools/src/extension_server.rs b/compute_tools/src/extension_server.rs index 9d7edfd5c3..702eb20577 100644 --- a/compute_tools/src/extension_server.rs +++ b/compute_tools/src/extension_server.rs @@ -1,6 +1,6 @@ // Download extension files from the extension store // and put them in the right place in the postgres directory -use anyhow::{self, bail, Result}; +use anyhow::{self, bail, Context, Result}; use remote_storage::*; use serde_json::{self, Value}; use std::collections::HashMap; @@ -12,6 +12,8 @@ use std::str; use tokio::io::AsyncReadExt; use tracing::info; +const SHARE_EXT_PATH: &str = "share/postgresql/extension"; + fn get_pg_config(argument: &str, pgbin: &str) -> String { // gives the result of `pg_config [argument]` // where argument is a flag like `--version` or `--sharedir` @@ -81,7 +83,6 @@ async fn download_helper( .download_stream .read_to_end(&mut write_data_buffer) .await?; - //dbg!(str::from_utf8(&write_data_buffer)?); if remote_from_prefix.is_some() { if let Some(prefix) = local_path.parent() { info!( @@ -106,24 +107,24 @@ pub async fn get_available_extensions( pgbin: &str, pg_version: &str, private_ext_prefixes: &Vec, -) -> anyhow::Result<()> { +) -> anyhow::Result>> { let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension"); let mut paths: Vec = Vec::new(); // public extensions paths.push(RemotePath::new( - &Path::new(pg_version).join("share/postgresql/extension"), + &Path::new(pg_version).join(SHARE_EXT_PATH), )?); // private extensions for private_prefix in private_ext_prefixes { paths.push(RemotePath::new( &Path::new(pg_version) .join(private_prefix) - .join("share/postgresql/extension"), + .join(SHARE_EXT_PATH), )?); } - let all_available_files = list_files_in_prefixes(remote_storage, &paths).await?; + let all_available_files = list_files_in_prefixes_for_extensions(remote_storage, &paths).await?; info!( "list of available_extension files {:?}", @@ -131,13 +132,15 @@ pub async fn get_available_extensions( ); // download all control files - for (obj_name, obj_path) in &all_available_files { - if obj_name.ends_with("control") { - download_helper(remote_storage, obj_path, None, &local_sharedir).await?; + for (obj_name, obj_paths) in &all_available_files { + for obj_path in obj_paths { + if obj_name.ends_with("control") { + download_helper(remote_storage, obj_path, None, &local_sharedir).await?; + } } } - Ok(()) + Ok(all_available_files) } // Download requested shared_preload_libraries @@ -201,74 +204,22 @@ pub async fn download_extension_sql_files( ext_name: &str, remote_storage: &GenericRemoteStorage, pgbin: &str, - pg_version: &str, - private_ext_prefixes: &Vec, + all_available_files: &HashMap>, ) -> Result<()> { let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension"); + let mut downloaded_something = false; - let mut paths: Vec = Vec::new(); - // public extensions - paths.push(RemotePath::new( - &Path::new(&pg_version).join("share/postgresql/extension"), - )?); - // private extensions - for private_prefix in private_ext_prefixes { - paths.push(RemotePath::new( - &Path::new(&pg_version) - .join(private_prefix) - .join("share/postgresql/extension"), - )?); - } - - let all_available_files: HashMap = - list_files_in_prefixes(remote_storage, &paths).await?; - - info!( - "list of available_extension files {:?}", - &all_available_files - ); - - // check if extension files exist - let mut files_to_download: Vec<(&RemotePath, Option<&Path>)> = Vec::new(); - - for (obj_name, obj_path) in &all_available_files { - // ignore control files - if !obj_name.ends_with("control") { - // We can't use just ext.object_name() here - // because extension files can be in subdirectories of the extension store. - // examples of layout: - // - // share/postgresql/extension/extension_name--1.0.sql - // - // or - // - // share/postgresql/extension/extension_name/extension_name--1.0.sql - // share/postgresql/extension/extension_name/extra_data.csv - // - for prefix in paths.iter() { - if let Ok(full_object_name) = obj_path.get_path().strip_prefix(prefix.get_path()) { - if full_object_name.to_str().unwrap().starts_with(ext_name) { - files_to_download.push((obj_path, Some(prefix.get_path()))); - } - } + if let Some(files) = all_available_files.get(ext_name) { + for file in files { + if file.extension().context("bad file name")? != "control" { + downloaded_something = true; + download_helper(remote_storage, file, None, &local_sharedir).await?; } } } - - if files_to_download.is_empty() { + if !downloaded_something { bail!("Files for extension {ext_name} are not found in the extension store"); } - - for (remote_from_path, remote_from_prefix) in files_to_download { - download_helper( - remote_storage, - remote_from_path, - remote_from_prefix, - &local_sharedir, - ) - .await?; - } - Ok(()) } @@ -355,3 +306,47 @@ async fn list_files_in_prefixes( Ok(res) } + +// helper to extract extension name +// extension files can be in subdirectories of the extension store. +// examples of layout: +// +// share/postgresql/extension/extension_name--1.0.sql +// +// or +// +// share/postgresql/extension/extension_name/extension_name--1.0.sql +// share/postgresql/extension/extension_name/extra_data.csv +// +// Note: we **assume** that the extension files is in one of these formats. +// If it is not, this code will not download it. +fn get_ext_name(path: &str) -> Result<&str> { + let path_suffix: Vec<&str> = path.split(&format!("{SHARE_EXT_PATH}/")).collect(); + let path_suffix = path_suffix.last().expect("bad ext name"); + for index in ["--", "/"] { + if let Some(index) = path_suffix.find(index) { + return Ok(&path_suffix[..index]); + } + } + Ok(path_suffix) +} + +// helper to collect files of given prefixes for extensions +// and group them by extension +// returns a hashmap of (extension_name, Vector of remote paths for all files needed for this extension) +async fn list_files_in_prefixes_for_extensions( + remote_storage: &GenericRemoteStorage, + paths: &Vec, +) -> Result>> { + let mut result = HashMap::new(); + for path in paths { + for file in remote_storage.list_files(Some(&path)).await? { + let file_ext_name = get_ext_name(file.get_path().to_str().context("invalid path")?)?; + let ext_file_list = result + .entry(file_ext_name.to_string()) + .or_insert(Vec::new()); + ext_file_list.push(file.to_owned()); + } + } + Ok(result) +}