diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index fae9ac9319..8ce418f3ae 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -49,7 +49,7 @@ use compute_api::responses::ComputeStatus; use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec}; use compute_tools::configurator::launch_configurator; -use compute_tools::extension_server::{download_extension, init_remote_storage, ExtensionType}; +use compute_tools::extension_server::{get_availiable_extensions, init_remote_storage}; use compute_tools::http::api::launch_http_server; use compute_tools::logger::*; use compute_tools::monitor::launch_monitor; @@ -76,13 +76,6 @@ fn main() -> Result<()> { None => None, }; - let rt = Runtime::new().unwrap(); - rt.block_on(async { - download_extension(&ext_remote_storage, ExtensionType::Shared, pgbin) - .await - .expect("download shared extensions should work"); - }); - let http_port = *matches .get_one::("http-port") .expect("http-port is required"); @@ -201,7 +194,9 @@ fn main() -> Result<()> { live_config_allowed, state: Mutex::new(new_state), state_changed: Condvar::new(), - ext_remote_storage: ext_remote_storage.clone(), + ext_remote_storage, + availiable_extensions: Vec::new(), + availiable_libraries: Vec::new(), }; let compute = Arc::new(compute_node); @@ -212,6 +207,19 @@ fn main() -> Result<()> { let extension_server_port: u16 = http_port; + // exen before we have spec, we can get public availiable extensions + // TODO maybe convert get_availiable_extensions into ComputeNode method as well as other functions + let rt = Runtime::new().unwrap(); + let copy_remote_storage = compute.ext_remote_storage.clone(); + + if let Some(remote_storage) = copy_remote_storage { + rt.block_on(async move { + get_availiable_extensions(&remote_storage, pgbin, None) + .await + .unwrap(); + }); + } + if !spec_set { // No spec provided, hang waiting for it. info!("no compute spec provided, waiting"); @@ -259,6 +267,17 @@ fn main() -> Result<()> { let _configurator_handle = launch_configurator(&compute).expect("cannot launch configurator thread"); + // download private tenant extensions before postgres start + // TODO + // compute_node.availiable_extensions = get_availiable_extensions(ext_remote_storage, + // pg_version, //TODO + // pgbin, + // tenant_id); //TODO get tenant_id from spec + + // download preload shared libraries before postgres start (if any) + // TODO + // download_library_file(); + // Start Postgres let mut delay_exit = false; let mut exit_code = None; diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 8cf030f722..2d21f08161 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -16,11 +16,11 @@ use utils::lsn::Lsn; use compute_api::responses::{ComputeMetrics, ComputeStatus}; use compute_api::spec::{ComputeMode, ComputeSpec}; -use remote_storage::GenericRemoteStorage; +use remote_storage::{GenericRemoteStorage, RemotePath}; -use crate::config; use crate::pg_helpers::*; use crate::spec::*; +use crate::{config, extension_server}; /// Compute node info shared across several `compute_ctl` threads. pub struct ComputeNode { @@ -49,6 +49,8 @@ pub struct ComputeNode { pub state_changed: Condvar, /// S3 extensions configuration variables pub ext_remote_storage: Option, + pub availiable_extensions: Vec, + pub availiable_libraries: Vec, } #[derive(Clone, Debug)] @@ -579,4 +581,28 @@ LIMIT 100", "{{\"pg_stat_statements\": []}}".to_string() } } + + pub async fn download_extension_sql_files(&self, filename: String) -> Result<()> { + match &self.ext_remote_storage { + None => anyhow::bail!("No remote extension storage"), + Some(remote_storage) => { + extension_server::download_extension_sql_files( + &filename, + &remote_storage, + &self.pgbin, + ) + .await + } + } + } + + pub async fn download_library_file(&self, filename: String) -> Result<()> { + match &self.ext_remote_storage { + None => anyhow::bail!("No remote extension storage"), + Some(remote_storage) => { + extension_server::download_library_file(&filename, &remote_storage, &self.pgbin) + .await + } + } + } } diff --git a/compute_tools/src/extension_server.rs b/compute_tools/src/extension_server.rs index 4ee304f583..742dbf34eb 100644 --- a/compute_tools/src/extension_server.rs +++ b/compute_tools/src/extension_server.rs @@ -1,4 +1,4 @@ -use anyhow::{self, bail}; +use anyhow::{self, bail, Result}; use remote_storage::*; use serde_json::{self, Value}; use std::fs::File; @@ -8,6 +8,7 @@ use std::path::{Path, PathBuf}; use std::str; use tokio::io::AsyncReadExt; use tracing::info; +use utils::id::TenantId; fn get_pg_config(argument: &str, pgbin: &str) -> String { // gives the result of `pg_config [argument]` @@ -57,60 +58,117 @@ async fn download_helper( Ok(()) } -pub enum ExtensionType { - Shared, // we just use the public folder here - Tenant(String), // String is tenant_id - Library(String), // String is name of the extension -} - -pub async fn download_extension( - remote_storage: &Option, - ext_type: ExtensionType, +// download extension control files +// +// return list of all extension files to use it in the future searches +// +// if tenant_id is provided - search in a private per-tenant extension path, +// otherwise - in public extension path +// +pub async fn get_availiable_extensions( + remote_storage: &GenericRemoteStorage, pgbin: &str, -) -> anyhow::Result<()> { - let remote_storage = match remote_storage { - Some(remote_storage) => remote_storage, - None => return Ok(()), - }; + tenant_id: Option, +) -> anyhow::Result> { + let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension"); let pg_version = get_pg_version(pgbin); - match ext_type { - ExtensionType::Shared => { - // 1. Download control files from s3-bucket/public/*.control to SHAREDIR/extension - // We can do this step even before we have spec, - // because public extensions are common for all projects. - let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension"); - let remote_sharedir = Path::new(&pg_version).join("share/postgresql/extension"); - let remote_sharedir = RemotePath::new(Path::new(&remote_sharedir))?; - let from_paths = remote_storage.list_files(Some(&remote_sharedir)).await?; - for remote_from_path in from_paths { - if remote_from_path.extension() == Some("control") { - download_helper(remote_storage, &remote_from_path, &local_sharedir).await?; - } - } - } - ExtensionType::Tenant(tenant_id) => { - // 2. After we have spec, before project start - // Download control files from s3-bucket/[tenant-id]/*.control to SHAREDIR/extension - let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension"); - let remote_path = RemotePath::new(Path::new(&tenant_id.to_string()))?; - let from_paths = remote_storage.list_files(Some(&remote_path)).await?; - for remote_from_path in from_paths { - if remote_from_path.extension() == Some("control") { - download_helper(remote_storage, &remote_from_path, &local_sharedir).await?; - } - } - } - ExtensionType::Library(library_name) => { - // 3. After we have spec, before postgres start - // Download preload_shared_libraries from s3-bucket/public/[library-name].control into LIBDIR/ + let remote_sharedir = match tenant_id { + None => RemotePath::new(&Path::new(&pg_version).join("share/postgresql/extension"))?, + Some(tenant_id) => RemotePath::new( + &Path::new(&pg_version) + .join(&tenant_id.to_string()) + .join("share/postgresql/extension"), + )?, + }; - let local_libdir: PathBuf = Path::new(&get_pg_config("--libdir", pgbin)).into(); - let remote_path = format!("{library_name}.control"); - let remote_from_path = RemotePath::new(Path::new(&remote_path))?; - download_helper(remote_storage, &remote_from_path, &local_libdir).await?; + info!( + "get_availiable_extensions remote_sharedir: {:?}, local_sharedir: {:?}", + remote_sharedir, local_sharedir + ); + + let from_paths = remote_storage.list_files(Some(&remote_sharedir)).await?; + + // download all found control files + for remote_from_path in &from_paths { + if remote_from_path.extension() == Some("control") { + download_helper(remote_storage, &remote_from_path, &local_sharedir).await?; } } + + Ok(from_paths) +} + +// download all sql files for a given extension name +// +pub async fn download_extension_sql_files( + ext_name: &str, + //availiable_extensions: &Vec, + remote_storage: &GenericRemoteStorage, + pgbin: &str, +) -> Result<()> { + let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension"); + + let pg_version = get_pg_version(pgbin); + let remote_sharedir = + RemotePath::new(&Path::new(&pg_version).join("share/postgresql/extension")).unwrap(); + + // TODO cache availiable_extensions list on the first read to avoid unneeded s3 calls + let availiable_extensions = remote_storage.list_files(Some(&remote_sharedir)).await?; + + info!( + "list of availiable_extension files {:?}", + &availiable_extensions + ); + + // check if extension files exist + let files_to_download: Vec<&RemotePath> = availiable_extensions + .iter() + .filter(|ext| { + ext.extension() == Some("sql") && ext.object_name().unwrap().starts_with(ext_name) + }) + .collect(); + + if files_to_download.is_empty() { + bail!("Files for extension {ext_name} are not found in the extension store"); + } + + for remote_from_path in files_to_download { + download_helper(remote_storage, &remote_from_path, &local_sharedir).await?; + } + + Ok(()) +} + +// download shared library file +pub async fn download_library_file( + lib_name: &str, + // availiable_libraries: &Vec, + remote_storage: &GenericRemoteStorage, + pgbin: &str, +) -> Result<()> { + let local_libdir: PathBuf = Path::new(&get_pg_config("--pkglibdir", pgbin)).into(); + + let pg_version = get_pg_version(pgbin); + let remote_sharedir = RemotePath::new(&Path::new(&pg_version).join("lib/")).unwrap(); + + // TODO cache availiable_libraries list on the first read to avoid unneeded s3 calls + let availiable_libraries = remote_storage.list_files(Some(&remote_sharedir)).await?; + + info!("list of library files {:?}", &availiable_libraries); + + // check if the library file exists + let lib = availiable_libraries + .iter() + .find(|lib: &&RemotePath| lib.object_name().unwrap() == lib_name); + + match lib { + None => bail!("Shared library file {lib_name} is not found in the extension store"), + Some(lib) => { + download_helper(remote_storage, &lib, &local_libdir).await?; + } + } + Ok(()) } diff --git a/compute_tools/src/http/api.rs b/compute_tools/src/http/api.rs index f3e126868e..264312f01c 100644 --- a/compute_tools/src/http/api.rs +++ b/compute_tools/src/http/api.rs @@ -16,7 +16,7 @@ use tokio::task; use tracing::{error, info}; use tracing_utils::http::OtelName; -use crate::extension_server::{self, ExtensionType}; +use crate::extension_server::{download_extension_sql_files, download_library_file}; fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse { ComputeStatusResponse { @@ -126,25 +126,51 @@ async fn routes(req: Request, compute: &Arc) -> Response { info!("serving {:?} POST request", route); + info!("req.uri {:?}", req.uri()); + + let mut is_library = false; + + if let Some(params) = req.uri().query() { + info!("serving {:?} POST request with params: {}", route, params); + + if params == "is_library=true" { + is_library = true; + } else { + let mut resp = Response::new(Body::from("Wrong request parameters")); + *resp.status_mut() = StatusCode::BAD_REQUEST; + return resp; + } + } let filename = route.split('/').last().unwrap().to_string(); info!( - "serving /extension_server POST request, filename: {:?}", - &filename + "serving /extension_server POST request, filename: {:?} is_library: {}", + filename, is_library ); - match extension_server::download_extension( - &compute.ext_remote_storage, - ExtensionType::Library(filename), - &compute.pgbin, - ) - .await - { - Ok(_) => Response::new(Body::from("OK")), - Err(e) => { - error!("download_extension failed: {}", e); - Response::new(Body::from(e.to_string())) + if is_library { + match compute.download_library_file(filename.to_string()).await { + Ok(_) => Response::new(Body::from("OK")), + Err(e) => { + error!("library download failed: {}", e); + let mut resp = Response::new(Body::from(e.to_string())); + *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; + resp + } + } + } else { + match compute + .download_extension_sql_files(filename.to_string()) + .await + { + Ok(_) => Response::new(Body::from("OK")), + Err(e) => { + error!("extension download failed: {}", e); + let mut resp = Response::new(Body::from(e.to_string())); + *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; + resp + } } } } diff --git a/pgxn/neon/extension_server.c b/pgxn/neon/extension_server.c index 1067e49914..4dce791068 100644 --- a/pgxn/neon/extension_server.c +++ b/pgxn/neon/extension_server.c @@ -29,9 +29,13 @@ static int extension_server_port = 0; static download_extension_file_hook_type prev_download_extension_file_hook = NULL; -// curl -X POST http://localhost:8080/extension_server/postgis-3.so +// to download all SQL files for an extension: +// curl -X POST http://localhost:8080/extension_server/postgis +// +// to download specific library file: +// curl -X POST http://localhost:8080/extension_server/postgis-3.so?=true static bool -neon_download_extension_file_http(const char *filename) +neon_download_extension_file_http(const char *filename, bool is_library) { CURL *curl; CURLcode res; @@ -44,7 +48,14 @@ neon_download_extension_file_http(const char *filename) elog(ERROR, "Failed to initialize curl handle"); } - compute_ctl_url = psprintf("http://localhost:%d/extension_server/%s", extension_server_port, filename); + + if (is_library) + { + elog(LOG, "request library"); + } + + compute_ctl_url = psprintf("http://localhost:%d/extension_server/%s%s", + extension_server_port, filename, is_library?"?is_library=true":""); elog(LOG, "curl_easy_perform() url: %s", compute_ctl_url); @@ -52,6 +63,7 @@ neon_download_extension_file_http(const char *filename) curl_easy_setopt(curl, CURLOPT_URL, compute_ctl_url); curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3L /* seconds */); + if (curl) { /* Perform the request, res will get the return code */ diff --git a/test_runner/regress/test_download_extensions.py b/test_runner/regress/test_download_extensions.py index 833ab9dd65..b9509c6882 100644 --- a/test_runner/regress/test_download_extensions.py +++ b/test_runner/regress/test_download_extensions.py @@ -82,12 +82,37 @@ def test_file_download(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re ) cleanup_files.append(local_name) - # Rust will then download the control files from the bucket - # our rust code should obtain the same result as the following: - # env.remote_storage_client.get_object( - # Bucket=env.ext_remote_storage.bucket_name, - # Key=os.path.join(BUCKET_PREFIX, PUB_EXT_PATHS[0]) - # )["Body"].read() + TEST_EXT_SQL_PATH = "v14/share/postgresql/extension/test_ext--1.0.sql" + test_ext_sql_file = BytesIO( + b""" + CREATE FUNCTION test_ext_add(integer, integer) RETURNS integer + AS 'select $1 + $2;' + LANGUAGE SQL + IMMUTABLE + RETURNS NULL ON NULL INPUT; + """ + ) + env.remote_storage_client.upload_fileobj( + test_ext_sql_file, + env.ext_remote_storage.bucket_name, + os.path.join(BUCKET_PREFIX, TEST_EXT_SQL_PATH), + ) + + # upload some fake library file + TEST_LIB_PATH = "v14/lib/test_ext.so" + test_lib_file = BytesIO( + b""" + 111 + """ + ) + env.remote_storage_client.upload_fileobj( + test_lib_file, + env.ext_remote_storage.bucket_name, + os.path.join(BUCKET_PREFIX, TEST_LIB_PATH), + ) + + tenant, _ = env.neon_cli.create_tenant() + env.neon_cli.create_timeline("test_file_download", tenant_id=tenant) remote_ext_config = json.dumps( { @@ -99,17 +124,13 @@ def test_file_download(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re ) endpoint = env.endpoints.create_start( - "test_file_download", tenant_id=tenant_id, remote_ext_config=remote_ext_config + "test_file_download", + tenant_id=tenant, + remote_ext_config=remote_ext_config, + config_lines=["log_min_messages=debug3"], ) with closing(endpoint.connect()) as conn: with conn.cursor() as cur: - # example query: insert some values and select them - cur.execute("CREATE TABLE t(key int primary key, value text)") - for i in range(100): - cur.execute(f"insert into t values({i}, {2*i})") - cur.execute("select * from t") - log.info(cur.fetchall()) - # Test query: check that test_ext0 was successfully downloaded cur.execute("SELECT * FROM pg_available_extensions") all_extensions = [x[0] for x in cur.fetchall()] @@ -118,10 +139,24 @@ def test_file_download(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re assert f"test_ext{i}" in all_extensions assert f"private_ext{i}" in all_extensions - # TODO: can create extension actually install an extension? - # cur.execute("CREATE EXTENSION test_ext0") - # log.info("**" * 100) - # log.info(cur.fetchall()) + cur.execute("CREATE EXTENSION test_ext") + + cur.execute("SELECT extname FROM pg_extension") + all_extensions = [x[0] for x in cur.fetchall()] + log.info(all_extensions) + assert "test_ext" in all_extensions + + try: + cur.execute("LOAD 'test_ext.so'") + except Exception as e: + # expected to fail with + # could not load library ... test_ext.so: file too short + # because test_ext.so is not real library file + log.info("LOAD test_ext.so failed (expectedly): %s", e) + assert "file too short" in str(e) + + # TODO add more test cases: + # - try to load non-existing library # cleanup downloaded extensions (TODO: the file names are quesionable here) for file in cleanup_files: @@ -130,3 +165,4 @@ def test_file_download(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re os.remove(file) except FileNotFoundError: log.info(f"{file} does not exist, so cannot be deleted") + assert "test_ext" in all_extensions diff --git a/vendor/postgres-v14 b/vendor/postgres-v14 index 5adfb36043..d24adf080e 160000 --- a/vendor/postgres-v14 +++ b/vendor/postgres-v14 @@ -1 +1 @@ -Subproject commit 5adfb36043348fc7700a5d8d68f925a912f01b87 +Subproject commit d24adf080e2d13bb3a25a4836903cbced7a01234 diff --git a/vendor/postgres-v15 b/vendor/postgres-v15 index ff7b85cd8a..b11cb5c762 160000 --- a/vendor/postgres-v15 +++ b/vendor/postgres-v15 @@ -1 +1 @@ -Subproject commit ff7b85cd8a089274a1394df369cf8692a33ec79e +Subproject commit b11cb5c762659ce831ae56df50684e0e9af5ecc9