From 3ce678b3bbc64bc7bf3b9bc3776921ee309b5098 Mon Sep 17 00:00:00 2001 From: Alek Westover Date: Tue, 4 Jul 2023 10:29:35 -0400 Subject: [PATCH] Fix paths to match infra more closely. Make extension_server actually async. Handle more complex cases of extensions with their dependencies. --- compute_tools/src/bin/compute_ctl.rs | 16 +- compute_tools/src/compute.rs | 33 +- compute_tools/src/extension_server.rs | 366 ++++++++++-------- control_plane/src/endpoint.rs | 2 +- pgxn/neon/extension_server.c | 6 +- .../regress/test_download_extensions.py | 57 +-- 6 files changed, 260 insertions(+), 220 deletions(-) diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 5aed7fd0b8..4f6934efa2 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -72,18 +72,10 @@ fn main() -> Result<()> { let pgbin = matches.get_one::("pgbin").unwrap_or(&pgbin_default); let remote_ext_config = matches.get_one::("remote-ext-config"); - let ext_remote_storage = match remote_ext_config { - Some(x) => match init_remote_storage(x, build_tag) { - Ok(y) => Some(y), - Err(e) => { - panic!( - "cannot initialize remote extension storage from config {}: {}", - x, e - ); - } - }, - None => None, - }; + let ext_remote_storage = remote_ext_config.map(|x| { + init_remote_storage(x, build_tag) + .expect("cannot initialize remote extension storage from config") + }); let http_port = *matches .get_one::("http-port") diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 314a3fe7c3..c88e3c1c9d 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -20,6 +20,7 @@ use compute_api::spec::{ComputeMode, ComputeSpec}; use remote_storage::{GenericRemoteStorage, RemotePath}; +use crate::extension_server::PathAndFlag; use crate::pg_helpers::*; use crate::spec::*; use crate::{config, extension_server}; @@ -53,8 +54,8 @@ pub struct ComputeNode { /// the S3 bucket that we search for extensions in pub ext_remote_storage: Option, // cached lists of available extensions and libraries - pub available_libraries: OnceLock>, - pub available_extensions: OnceLock>>, + pub available_libraries: OnceLock>>, + pub available_extensions: OnceLock>>, } #[derive(Clone, Debug)] @@ -531,6 +532,9 @@ impl ComputeNode { pspec.timeline_id, ); + // TODO FIXME: this should not be blocking here + // Maybe we can run it as a child process? + // Also, worth measuring how long this step is taking self.prepare_external_extensions(&compute_state)?; self.prepare_pgdata(&compute_state, extension_server_port)?; @@ -734,33 +738,30 @@ LIMIT 100", } info!("Libraries to download: {:?}", &libs_vec); - // download extension control files & shared_preload_libraries - - let available_extensions = extension_server::get_available_extensions( + let get_extensions_task = extension_server::get_available_extensions( ext_remote_storage, &self.pgbin, &self.pgversion, &custom_ext_prefixes, - ) - .await?; - self.available_extensions - .set(available_extensions) - .expect("available_extensions.set error"); - - let available_libraries = extension_server::get_available_libraries( + ); + let get_libraries_task = extension_server::get_available_libraries( ext_remote_storage, &self.pgbin, &self.pgversion, &custom_ext_prefixes, &libs_vec, - ) - .await?; + ); + + let (available_extensions, available_libraries) = + tokio::join!(get_extensions_task, get_libraries_task); + self.available_extensions + .set(available_extensions?) + .expect("available_extensions.set error"); self.available_libraries - .set(available_libraries) + .set(available_libraries?) .expect("available_libraries.set error"); } - Ok(()) } diff --git a/compute_tools/src/extension_server.rs b/compute_tools/src/extension_server.rs index 11b42abf8c..99b9fbff8e 100644 --- a/compute_tools/src/extension_server.rs +++ b/compute_tools/src/extension_server.rs @@ -1,6 +1,7 @@ // Download extension files from the extension store // and put them in the right place in the postgres directory use anyhow::{self, bail, Context, Result}; +use futures::future::join_all; use remote_storage::*; use serde_json::{self, Value}; use std::collections::HashMap; @@ -12,7 +13,15 @@ use std::str; use tokio::io::AsyncReadExt; use tracing::info; -const SHARE_EXT_PATH: &str = "share/postgresql/extension"; +// remote! +const SHARE_EXT_PATH: &str = "share/extension"; + +fn pass_any_error(results: Vec>) -> Result<()> { + for result in results { + result?; + } + Ok(()) +} fn get_pg_config(argument: &str, pgbin: &str) -> String { // gives the result of `pg_config [argument]` @@ -42,59 +51,42 @@ pub fn get_pg_version(pgbin: &str) -> String { async fn download_helper( remote_storage: &GenericRemoteStorage, - remote_from_path: &RemotePath, - remote_from_prefix: Option<&Path>, + remote_from_path: RemotePath, + sub_directory: Option<&str>, download_location: &Path, ) -> anyhow::Result<()> { - // downloads file at remote_from_path to download_location/[file_name] + // downloads file at remote_from_path to + // `download_location/[optional: subdirectory]/[remote_storage.object_name()]` + // Note: the subdirectory commmand is needed when there is an extension that + // depends on files in a subdirectory. + // For example, v14/share/extension/some_ext.control + // might depend on v14/share/extension/some_ext/some_ext--1.1.0.sql + // and v14/share/extension/some_ext/xxx.csv + // Note: it is the caller's responsibility to create the appropriate subdirectory - // we cannot use remote_from_path.object_name() here - // because extension files can be in subdirectories of the extension store. - // - // To handle this, we use remote_from_prefix to strip the prefix from the path - // this gives us the relative path of the file in the extension store, - // and we use this relative path to construct the local path. - // - let local_path = match remote_from_prefix { - Some(prefix) => { - let p = remote_from_path - .get_path() - .strip_prefix(prefix) - .expect("bad prefix"); - - download_location.join(p) - } + let local_path = match sub_directory { + Some(subdir) => download_location + .join(subdir) + .join(remote_from_path.object_name().expect("bad object")), None => download_location.join(remote_from_path.object_name().expect("bad object")), }; - if local_path.exists() { info!("File {:?} already exists. Skipping download", &local_path); return Ok(()); } - info!( "Downloading {:?} to location {:?}", &remote_from_path, &local_path ); - let mut download = remote_storage.download(remote_from_path).await?; + let mut download = remote_storage.download(&remote_from_path).await?; let mut write_data_buffer = Vec::new(); download .download_stream .read_to_end(&mut write_data_buffer) .await?; - if remote_from_prefix.is_some() { - if let Some(prefix) = local_path.parent() { - info!( - "Downloading file with prefix. Create directory {:?}", - prefix - ); - // if directory already exists, this is a no-op - std::fs::create_dir_all(prefix)?; - } - } - let mut output_file = BufWriter::new(File::create(local_path)?); output_file.write_all(&write_data_buffer)?; + info!("Download {:?} completed successfully", &remote_from_path); Ok(()) } @@ -107,15 +99,14 @@ pub async fn get_available_extensions( pgbin: &str, pg_version: &str, custom_ext_prefixes: &Vec, -) -> anyhow::Result>> { +) -> anyhow::Result>> { let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension"); + // public path, plus any private paths to download extensions from let mut paths: Vec = Vec::new(); - // public extensions paths.push(RemotePath::new( &Path::new(pg_version).join(SHARE_EXT_PATH), )?); - // custom extensions for custom_prefix in custom_ext_prefixes { paths.push(RemotePath::new( &Path::new(pg_version) @@ -124,23 +115,21 @@ pub async fn get_available_extensions( )?); } - let all_available_files = list_files_in_prefixes_for_extensions(remote_storage, &paths).await?; - - info!( - "list of available_extension files {:?}", - &all_available_files - ); + let (extension_files, control_files) = + organized_extension_files(remote_storage, &paths).await?; + let mut control_file_download_tasks = Vec::new(); // download all control files - for (obj_name, obj_paths) in &all_available_files { - for obj_path in obj_paths { - if obj_name.ends_with("control") { - download_helper(remote_storage, obj_path, None, &local_sharedir).await?; - } - } + for control_file in control_files { + control_file_download_tasks.push(download_helper( + remote_storage, + control_file.clone(), + None, + &local_sharedir, + )); } - - Ok(all_available_files) + pass_any_error(join_all(control_file_download_tasks).await)?; + Ok(extension_files) } // Download requested shared_preload_libraries @@ -155,11 +144,9 @@ pub async fn get_available_libraries( pg_version: &str, custom_ext_prefixes: &Vec, preload_libraries: &Vec, -) -> anyhow::Result> { - let local_libdir: PathBuf = Path::new(&get_pg_config("--pkglibdir", pgbin)).into(); +) -> anyhow::Result>> { // Construct a hashmap of all available libraries - // example (key, value) pair: test_lib0.so, v14/lib/test_lib0.so - + // example (key, value) pair: test_lib0: [RemotePath(v14/lib/test_lib0.so), RemotePath(v14/lib/test_lib0.so.3)] let mut paths: Vec = Vec::new(); // public libraries paths.push( @@ -173,32 +160,20 @@ pub async fn get_available_libraries( .expect("The hard coded path here is valid"), ); } - - let all_available_libraries = list_files_in_prefixes(remote_storage, &paths).await?; + let all_available_libraries = organized_library_files(remote_storage, &paths).await?; info!("list of library files {:?}", &all_available_libraries); - // download all requested libraries + let mut download_tasks = Vec::new(); for lib_name in preload_libraries { - // add file extension if it isn't in the filename - let lib_name_with_ext = enforce_so_end(lib_name); - info!("looking for library {:?}", &lib_name_with_ext); - - match all_available_libraries.get(&*lib_name_with_ext) { - Some(remote_path) => { - download_helper(remote_storage, remote_path, None, &local_libdir).await? - } - None => { - let file_path = local_libdir.join(&lib_name_with_ext); - if file_path.exists() { - info!("File {:?} already exists. Skipping download", &file_path); - } else { - bail!("Shared library file {lib_name} is not found in the extension store") - } - } - } + download_tasks.push(download_library_file( + lib_name, + remote_storage, + pgbin, + &all_available_libraries, + )); } - + pass_any_error(join_all(download_tasks).await)?; Ok(all_available_libraries) } @@ -208,47 +183,54 @@ pub async fn download_extension_files( ext_name: &str, remote_storage: &GenericRemoteStorage, pgbin: &str, - all_available_files: &HashMap>, + all_available_files: &HashMap>, ) -> Result<()> { let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension"); let mut downloaded_something = false; + let mut made_subdir = false; + info!("EXTENSION {:?}", ext_name); + info!("{:?}", all_available_files.get(ext_name)); + + info!("start download"); + let mut download_tasks = Vec::new(); if let Some(files) = all_available_files.get(ext_name) { - for file in files { - if file.extension().context("bad file name")? != "control" { - // find files prefix to handle cases when extension files are stored - // in a directory with the same name as the extension - // example: - // share/postgresql/extension/extension_name/extension_name--1.0.sql - let index = file - .get_path() - .to_str() - .context("invalid path")? - .find(ext_name) - .context("invalid path")?; - - let prefix_str = - file.get_path().to_str().context("invalid path")?[..index].to_string(); - let remote_from_prefix = if prefix_str.is_empty() { - None - } else { - Some(Path::new(&prefix_str)) - }; - - download_helper(remote_storage, file, remote_from_prefix, &local_sharedir).await?; - downloaded_something = true; + info!("Downloading files for extension {:?}", &ext_name); + for path_and_flag in files { + let file = &path_and_flag.path; + let subdir_flag = path_and_flag.subdir_flag; + info!( + "--- Downloading {:?} (for {:?} as subdir? = {:?})", + &file, &ext_name, subdir_flag + ); + let mut subdir = None; + if subdir_flag { + subdir = Some(ext_name); + if !made_subdir { + made_subdir = true; + std::fs::create_dir_all(local_sharedir.join(ext_name))?; + } } + download_tasks.push(download_helper( + remote_storage, + file.clone(), + subdir, + &local_sharedir, + )); + downloaded_something = true; } } if !downloaded_something { bail!("Files for extension {ext_name} are not found in the extension store"); } + pass_any_error(join_all(download_tasks).await)?; + info!("finish download"); Ok(()) } // appends an .so suffix to libname if it does not already have one fn enforce_so_end(libname: &str) -> String { - if !libname.ends_with(".so") { + if !libname.contains(".so") { format!("{}.so", libname) } else { libname.to_string() @@ -260,20 +242,44 @@ pub async fn download_library_file( lib_name: &str, remote_storage: &GenericRemoteStorage, pgbin: &str, - all_available_libraries: &HashMap, + all_available_libraries: &HashMap>, ) -> Result<()> { + let lib_name = get_library_name(lib_name); let local_libdir: PathBuf = Path::new(&get_pg_config("--pkglibdir", pgbin)).into(); - let lib_name_with_ext = enforce_so_end(lib_name); - info!("looking for library {:?}", &lib_name_with_ext); - match all_available_libraries.get(&*lib_name_with_ext) { - Some(remote_path) => { - download_helper(remote_storage, remote_path, None, &local_libdir).await? + info!("looking for library {:?}", &lib_name); + match all_available_libraries.get(&*lib_name) { + Some(remote_paths) => { + let mut library_download_tasks = Vec::new(); + for remote_path in remote_paths { + let file_path = local_libdir.join(remote_path.object_name().expect("bad object")); + if file_path.exists() { + info!("File {:?} already exists. Skipping download", &file_path); + } else { + library_download_tasks.push(download_helper( + remote_storage, + remote_path.clone(), + None, + &local_libdir, + )); + } + } + pass_any_error(join_all(library_download_tasks).await)?; + } + None => { + // minor TODO: this logic seems to be somewhat faulty for .so.3 type files? + let lib_name_with_ext = enforce_so_end(&lib_name); + let file_path = local_libdir.join(lib_name_with_ext); + if file_path.exists() { + info!("File {:?} already exists. Skipping download", &file_path); + } else { + bail!("Library file {lib_name} not found") + } } - None => bail!("Shared library file {lib_name} is not found in the extension store"), } Ok(()) } +// This function initializes the necessary structs to use remmote storage (should be fairly cheap) pub fn init_remote_storage( remote_ext_config: &str, default_prefix: &str, @@ -315,70 +321,108 @@ pub fn init_remote_storage( GenericRemoteStorage::from_config(&config) } -// helper to collect all files in the given prefixes -// returns hashmap of (file_name, file_remote_path) -async fn list_files_in_prefixes( +fn get_library_name(path: &str) -> String { + let path_suffix: Vec<&str> = path.split('/').collect(); + let path_suffix = path_suffix.last().expect("bad ext name").to_string(); + if let Some(index) = path_suffix.find(".so") { + return path_suffix[..index].to_string(); + } + path_suffix +} + +// asyncrounously lists files in all necessary directories +// TODO: potential optimization: do a single list files on the entire bucket +// and then filter out the files we don't need +async fn list_all_files( remote_storage: &GenericRemoteStorage, paths: &Vec, -) -> Result> { - let mut res = HashMap::new(); - +) -> Result> { + let mut list_tasks = Vec::new(); + let mut all_files = Vec::new(); for path in paths { - for file in remote_storage.list_files(Some(path)).await? { - res.insert( - file.object_name().expect("bad object").to_owned(), - file.to_owned(), - ); - } + list_tasks.push(remote_storage.list_files(Some(path))); } - - Ok(res) + for list_result in join_all(list_tasks).await { + all_files.extend(list_result?); + } + Ok(all_files) } -// helper to extract extension name -// extension files can be in subdirectories of the extension store. -// examples of layout: -// -// share/postgresql/extension/extension_name--1.0.sql -// -// or -// -// share/postgresql/extension/extension_name/extension_name--1.0.sql -// share/postgresql/extension/extension_name/extra_data.csv -// -// Note: we **assume** that the extension files is in one of these formats. -// If it is not, this code will not download it. -fn get_ext_name(path: &str) -> Result<&str> { - let path_suffix: Vec<&str> = path.split(&format!("{SHARE_EXT_PATH}/")).collect(); - - let path_suffix = path_suffix.last().expect("bad ext name"); - // the order of these is important - // otherwise we'll return incorrect extension name - // for path like share/postgresql/extension/extension_name/extension_name--1.0.sql - for index in ["/", "--"] { - if let Some(index) = path_suffix.find(index) { - return Ok(&path_suffix[..index]); - } - } - Ok(path_suffix) -} - -// helper to collect files of given prefixes for extensions -// and group them by extension -// returns a hashmap of (extension_name, Vector of remote paths for all files needed for this extension) -async fn list_files_in_prefixes_for_extensions( +// helper to collect all libraries, grouped by library name +// Returns a hashmap of (library name: [paths]}) +// example entry: {libpgtypes: [libpgtypes.so.3, libpgtypes.so]} +async fn organized_library_files( remote_storage: &GenericRemoteStorage, paths: &Vec, ) -> Result>> { - let mut result = HashMap::new(); - for path in paths { - for file in remote_storage.list_files(Some(path)).await? { - let file_ext_name = get_ext_name(file.get_path().to_str().context("invalid path")?)?; - let ext_file_list = result + let mut library_groups = HashMap::new(); + for file in list_all_files(remote_storage, paths).await? { + let lib_name = get_library_name(file.get_path().to_str().context("invalid path")?); + let lib_list = library_groups.entry(lib_name).or_insert(Vec::new()); + lib_list.push(file.to_owned()); + } + Ok(library_groups) +} + +// store a path, paired with a flag indicating whether the path is to a file in +// the root or subdirectory +#[derive(Debug)] +pub struct PathAndFlag { + path: RemotePath, + subdir_flag: bool, +} + +// get_ext_name extracts the extension name, and returns a flag indicating +// whether this file is in a subdirectory or not. +// +// extension files can be in subdirectories of the extension store. +// examples of layout: +// v14//share//extension/extension_name--1.0.sql, +// v14//share//extension/extension_name/extension_name--1.0.sql, +// v14//share//extension/extension_name/extra_data.csv +// Note: we *assume* that the extension files is in one of these formats. +// If it is not, this code's behavior is *undefined*. +fn get_ext_name(path: &str) -> Result<(&str, bool)> { + let path_suffix: Vec<&str> = path.split(&format!("{SHARE_EXT_PATH}/")).collect(); + let ext_name = path_suffix.last().expect("bad ext name"); + + if let Some(index) = ext_name.find('/') { + return Ok((&ext_name[..index], true)); + } else if let Some(index) = ext_name.find("--") { + return Ok((&ext_name[..index], false)); + } + Ok((ext_name, false)) +} + +// helper to collect files of given prefixes for extensions and group them by extension +// returns a hashmap of (extension_name, Vector of remote paths for all files needed for this extension) +// and a list of control files +// For example, an entry in the hashmap could be +// {"anon": [RemotePath("v14/anon/share/extension/anon/address.csv"), +// RemotePath("v14/anon/share/extension/anon/anon--1.1.0.sql")]}, +// with corresponding list of control files entry being +// {"anon.control": RemotePath("v14/anon/share/extension/anon.control")} +async fn organized_extension_files( + remote_storage: &GenericRemoteStorage, + paths: &Vec, +) -> Result<(HashMap>, Vec)> { + let mut grouped_dependencies = HashMap::new(); + let mut control_files = Vec::new(); + + for file in list_all_files(remote_storage, paths).await? { + if file.extension().context("bad file name")? == "control" { + control_files.push(file.to_owned()); + } else { + let (file_ext_name, subdir_flag) = + get_ext_name(file.get_path().to_str().context("invalid path")?)?; + let ext_file_list = grouped_dependencies .entry(file_ext_name.to_string()) .or_insert(Vec::new()); - ext_file_list.push(file.to_owned()); + ext_file_list.push(PathAndFlag { + path: file.to_owned(), + subdir_flag, + }); } } - Ok(result) + Ok((grouped_dependencies, control_files)) } diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index a9d9b5a291..30e86cb12d 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -491,7 +491,7 @@ impl Endpoint { pageserver_connstring: Some(pageserver_connstring), safekeeper_connstrings, storage_auth_token: auth_token.clone(), - // This is a hack to test custom extensions locally. + // TODO FIXME: This is a hack to test custom extensions locally. // In test_download_extensions, we assume that the custom extension // prefix is the tenant ID. So we set it here. // diff --git a/pgxn/neon/extension_server.c b/pgxn/neon/extension_server.c index 16008200ef..01c86867db 100644 --- a/pgxn/neon/extension_server.c +++ b/pgxn/neon/extension_server.c @@ -53,14 +53,14 @@ neon_download_extension_file_http(const char *filename, bool is_library) } compute_ctl_url = psprintf("http://localhost:%d/extension_server/%s%s", - extension_server_port, filename, is_library?"?is_library=true":""); + extension_server_port, filename, is_library ? "?is_library=true" : ""); elog(LOG, "Sending request to compute_ctl: %s", compute_ctl_url); curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "POST"); curl_easy_setopt(curl, CURLOPT_URL, compute_ctl_url); - curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3L /* seconds */); - + // NOTE: 15L may be insufficient time for large extensions like postgis + curl_easy_setopt(curl, CURLOPT_TIMEOUT, 15L /* seconds */); if (curl) { diff --git a/test_runner/regress/test_download_extensions.py b/test_runner/regress/test_download_extensions.py index eda4f5f593..b60c2aca12 100644 --- a/test_runner/regress/test_download_extensions.py +++ b/test_runner/regress/test_download_extensions.py @@ -22,7 +22,7 @@ comment = 'This is a mock extension' default_version = '1.0' module_pathname = '$libdir/test_ext{i}' relocatable = true""" - return output + return BytesIO(bytes(output, "utf-8")) def sql_file_content(): @@ -33,7 +33,11 @@ def sql_file_content(): IMMUTABLE RETURNS NULL ON NULL INPUT; """ - return output + return BytesIO(bytes(output, "utf-8")) + + +def fake_library_content(): + return BytesIO(bytes("\n111\n", "utf-8")) # Prepare some mock extension files and upload them to the bucket @@ -47,9 +51,10 @@ def prepare_mock_ext_storage( ): bucket_prefix = ext_remote_storage.prefix_in_bucket custom_prefix = str(tenant_id) - PUB_EXT_ROOT = f"v{pg_version}/share/postgresql/extension" - PRIVATE_EXT_ROOT = f"v{pg_version}/{custom_prefix}/share/postgresql/extension" - LOCAL_EXT_ROOT = f"pg_install/{PUB_EXT_ROOT}" + + PUB_EXT_ROOT = f"v{pg_version}/share/extension" + PRIVATE_EXT_ROOT = f"v{pg_version}/{custom_prefix}/share/extension" + LOCAL_EXT_ROOT = f"pg_install/{pg_version}/share/postgresql/extension" PUB_LIB_ROOT = f"v{pg_version}/lib" PRIVATE_LIB_ROOT = f"v{pg_version}/{custom_prefix}/lib" @@ -70,56 +75,53 @@ def prepare_mock_ext_storage( # Upload several test_ext{i}.control files to the bucket for i in range(NUM_EXT): - public_ext = BytesIO(bytes(control_file_content("public", i), "utf-8")) public_remote_name = f"{bucket_prefix}/{PUB_EXT_ROOT}/test_ext{i}.control" - public_local_name = f"{LOCAL_EXT_ROOT}/test_ext{i}.control" - custom_ext = BytesIO(bytes(control_file_content(str(tenant_id), i), "utf-8")) custom_remote_name = f"{bucket_prefix}/{PRIVATE_EXT_ROOT}/custom_ext{i}.control" - custom_local_name = f"{LOCAL_EXT_ROOT}/custom_ext{i}.control" - cleanup_files += [public_local_name, custom_local_name] + cleanup_files += [ + f"{LOCAL_EXT_ROOT}/test_ext{i}.control", + f"{LOCAL_EXT_ROOT}/custom_ext{i}.control", + ] + log.info(f"Uploading control file to {public_remote_name}") remote_storage_client.upload_fileobj( - public_ext, ext_remote_storage.bucket_name, public_remote_name + control_file_content("public", i), ext_remote_storage.bucket_name, public_remote_name ) + log.info(f"Uploading control file to {custom_remote_name}") remote_storage_client.upload_fileobj( - custom_ext, ext_remote_storage.bucket_name, custom_remote_name + control_file_content(str(tenant_id), i), + ext_remote_storage.bucket_name, + custom_remote_name, ) # Upload SQL file for the extension we're going to create sql_filename = "test_ext0--1.0.sql" test_sql_public_remote_path = f"{bucket_prefix}/{PUB_EXT_ROOT}/{sql_filename}" - test_sql_local_path = f"{LOCAL_EXT_ROOT}/{sql_filename}" - test_ext_sql_file = BytesIO(bytes(sql_file_content(), "utf-8")) remote_storage_client.upload_fileobj( - test_ext_sql_file, + sql_file_content(), ext_remote_storage.bucket_name, test_sql_public_remote_path, ) - cleanup_files += [test_sql_local_path] + cleanup_files += [f"{LOCAL_EXT_ROOT}/{sql_filename}"] # upload some fake library files for i in range(2): - public_library = BytesIO(bytes("\n111\n", "utf-8")) public_remote_name = f"{bucket_prefix}/{PUB_LIB_ROOT}/test_lib{i}.so" - public_local_name = f"{LOCAL_LIB_ROOT}/test_lib{i}.so" - custom_library = BytesIO(bytes("\n111\n", "utf-8")) custom_remote_name = f"{bucket_prefix}/{PRIVATE_LIB_ROOT}/custom_lib{i}.so" - custom_local_name = f"{LOCAL_LIB_ROOT}/custom_lib{i}.so" - - log.info(f"uploading library to {public_remote_name}") - log.info(f"uploading library to {custom_remote_name}") + log.info(f"uploading fake library to {public_remote_name}") remote_storage_client.upload_fileobj( - public_library, + fake_library_content(), ext_remote_storage.bucket_name, public_remote_name, ) + + log.info(f"uploading fake library to {custom_remote_name}") remote_storage_client.upload_fileobj( - custom_library, + fake_library_content(), ext_remote_storage.bucket_name, custom_remote_name, ) - cleanup_files += [public_local_name, custom_local_name] + cleanup_files += [f"{LOCAL_LIB_ROOT}/test_lib{i}.so", f"{LOCAL_LIB_ROOT}/custom_lib{i}.so"] return cleanup_files @@ -136,7 +138,8 @@ def prepare_mock_ext_storage( # export AWS_SECURITY_TOKEN='test' # export AWS_SESSION_TOKEN='test' # export AWS_DEFAULT_REGION='us-east-1' -# + + @pytest.mark.parametrize("remote_storage_kind", available_s3_storages()) def test_remote_extensions( neon_env_builder: NeonEnvBuilder,