From 34f22e9b12252fe2b8891c2f117ad9cb0edee771 Mon Sep 17 00:00:00 2001 From: Anastasia Lubennikova Date: Wed, 31 May 2023 15:21:57 +0100 Subject: [PATCH] Request extension files from compute_ctl --- compute_tools/src/bin/compute_ctl.rs | 4 +- compute_tools/src/compute.rs | 18 +++-- compute_tools/src/config.rs | 10 ++- compute_tools/src/extension_server.rs | 35 +++++++++ compute_tools/src/http/api.rs | 79 +++++++++++++++++++- compute_tools/src/http/openapi_spec.yaml | 28 +++++++ compute_tools/src/spec.rs | 2 +- docs/rfcs/024-extension-loading.md | 92 +++++++++++++++++++++++ pgxn/neon/Makefile | 1 + pgxn/neon/extension_server.c | 94 ++++++++++++++++++++++++ pgxn/neon/extension_server.h | 1 + pgxn/neon/neon.c | 3 + pgxn/neon/neon.h | 2 + vendor/postgres-v14 | 2 +- vendor/postgres-v15 | 2 +- 15 files changed, 361 insertions(+), 12 deletions(-) create mode 100644 compute_tools/src/extension_server.rs create mode 100644 pgxn/neon/extension_server.c create mode 100644 pgxn/neon/extension_server.h diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index c6cfde1d1a..6c2880ba8a 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -184,6 +184,8 @@ fn main() -> Result<()> { let _http_handle = launch_http_server(http_port, &compute).expect("cannot launch http endpoint thread"); + let extension_server_port: u16 = http_port; + if !spec_set { // No spec provided, hang waiting for it. info!("no compute spec provided, waiting"); @@ -224,7 +226,7 @@ fn main() -> Result<()> { // Start Postgres let mut delay_exit = false; let mut exit_code = None; - let pg = match compute.start_compute() { + let pg = match compute.start_compute(extension_server_port) { Ok(pg) => Some(pg), Err(err) => { error!("could not start the compute node: {:?}", err); diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 617b330704..f6a3c87fe1 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -245,14 +245,22 @@ impl ComputeNode { /// Do all the preparations like PGDATA directory creation, configuration, /// safekeepers sync, basebackup, etc. #[instrument(skip(self, compute_state))] - pub fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> { + pub fn prepare_pgdata( + &self, + compute_state: &ComputeState, + extension_server_port: u16, + ) -> Result<()> { let pspec = compute_state.pspec.as_ref().expect("spec must be set"); let spec = &pspec.spec; let pgdata_path = Path::new(&self.pgdata); // Remove/create an empty pgdata directory and put configuration there. self.create_pgdata()?; - config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &pspec.spec)?; + config::write_postgres_conf( + &pgdata_path.join("postgresql.conf"), + &pspec.spec, + Some(extension_server_port), + )?; // Syncing safekeepers is only safe with primary nodes: if a primary // is already connected it will be kicked out, so a secondary (standby) @@ -395,7 +403,7 @@ impl ComputeNode { // Write new config let pgdata_path = Path::new(&self.pgdata); - config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec)?; + config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec, None)?; let mut client = Client::connect(self.connstr.as_str(), NoTls)?; self.pg_reload_conf(&mut client)?; @@ -425,7 +433,7 @@ impl ComputeNode { } #[instrument(skip(self))] - pub fn start_compute(&self) -> Result { + pub fn start_compute(&self, extension_server_port: u16) -> Result { let compute_state = self.state.lock().unwrap().clone(); let spec = compute_state.pspec.as_ref().expect("spec must be set"); info!( @@ -436,7 +444,7 @@ impl ComputeNode { spec.timeline_id, ); - self.prepare_pgdata(&compute_state)?; + self.prepare_pgdata(&compute_state, extension_server_port)?; let start_time = Utc::now(); diff --git a/compute_tools/src/config.rs b/compute_tools/src/config.rs index 99346433d0..1b9d5037d5 100644 --- a/compute_tools/src/config.rs +++ b/compute_tools/src/config.rs @@ -33,7 +33,11 @@ pub fn line_in_file(path: &Path, line: &str) -> Result { } /// Create or completely rewrite configuration file specified by `path` -pub fn write_postgres_conf(path: &Path, spec: &ComputeSpec) -> Result<()> { +pub fn write_postgres_conf( + path: &Path, + spec: &ComputeSpec, + extension_server_port: Option, +) -> Result<()> { // File::create() destroys the file content if it exists. let mut file = File::create(path)?; @@ -95,5 +99,9 @@ pub fn write_postgres_conf(path: &Path, spec: &ComputeSpec) -> Result<()> { writeln!(file, "# Managed by compute_ctl: end")?; } + if let Some(port) = extension_server_port { + writeln!(file, "neon.extension_server_port={}", port)?; + } + Ok(()) } diff --git a/compute_tools/src/extension_server.rs b/compute_tools/src/extension_server.rs new file mode 100644 index 0000000000..6d910874f9 --- /dev/null +++ b/compute_tools/src/extension_server.rs @@ -0,0 +1,35 @@ +use std::io::{Read, Write}; +use std::net::{TcpListener, TcpStream}; +use std::path::Path; +use std::str; +use std::{fs, thread}; + +use anyhow::Context; +use tracing::info; + +pub fn download_file(mut stream: TcpStream) -> anyhow::Result<()> { + let mut buf = [0; 512]; + + stream.read(&mut buf).expect("Error reading from stream"); + + let filename = str::from_utf8(&buf) + .context("filename is not UTF-8")? + .trim_end(); + + println!("requested file {}", filename); + + let from_prefix = "/tmp/from_prefix"; + let to_prefix = "/tmp/to_prefix"; + + let filepath = Path::new(from_prefix).join(filename); + let copy_to_filepath = Path::new(to_prefix).join(filename); + fs::copy(filepath, copy_to_filepath)?; + + // Write back the response to the TCP stream + match stream.write("OK".as_bytes()) { + Err(e) => anyhow::bail!("Read-Server: Error writing to stream {}", e), + Ok(_) => (), + } + + Ok(()) +} diff --git a/compute_tools/src/http/api.rs b/compute_tools/src/http/api.rs index afd9c2fb54..0ef0efaf46 100644 --- a/compute_tools/src/http/api.rs +++ b/compute_tools/src/http/api.rs @@ -1,7 +1,8 @@ use std::convert::Infallible; use std::net::SocketAddr; +use std::path::Path; use std::sync::Arc; -use std::thread; +use std::{fs, thread}; use crate::compute::{ComputeNode, ComputeState, ParsedSpec}; use compute_api::requests::ConfigurationRequest; @@ -121,8 +122,30 @@ async fn routes(req: Request, compute: &Arc) -> Response { + info!("serving {:?} POST request", route); + + let filename = route.split("/").last().unwrap(); + + info!( + "serving /extension_server POST request, filename: {:?}", + filename + ); + + match download_file(&filename).await { + Ok(_) => Response::new(Body::from("OK")), + Err(e) => { + error!("download_file failed: {}", e); + Response::new(Body::from(e.to_string())) + } + } + } + // Return the `404 Not Found` for any other routes. - _ => { + method => { + info!("404 Not Found for {:?}", method); + let mut not_found = Response::new(Body::from("404 Not Found")); *not_found.status_mut() = StatusCode::NOT_FOUND; not_found @@ -130,6 +153,58 @@ async fn routes(req: Request, compute: &Arc) -> Response anyhow::Result<()> { + info!("requested file {}", filename); + + let from_prefix: &str = "/tmp"; //debug only + let to_prefix = "/home/anastasia/work/neon/pg_install/v15/"; + + if filename.ends_with(".so") { + info!("requested file is a shared object file {}", filename); + + let from_path = Path::new(from_prefix).join("lib").join(filename); + let to_path = Path::new(to_prefix).join("lib").join(filename); + + info!( + "copying file {} from {} to {}", + filename, + from_path.display(), + to_path.display() + ); + + fs::copy(from_path, to_path)?; + } else { + info!("requested all extension files with prefix {}", filename); + + let from_path = Path::new(from_prefix).join("share/extension/"); + let to_path = Path::new(to_prefix).join("share/postgresql/extension/"); + + info!( + "copying files from {} to {}", + from_path.display(), + to_path.display() + ); + + for file in fs::read_dir(from_path.clone()).unwrap().flatten() { + let fname = file.file_name().into_string().unwrap(); + + if fname.starts_with(filename) && fname.ends_with(".sql") { + info!( + "copying file {} from {} to {}", + fname, + from_path.display(), + to_path.display() + ); + + fs::copy(file.path(), to_path.join(fname))?; + } + } + } + + Ok(()) +} + async fn handle_configure_request( req: Request, compute: &Arc, diff --git a/compute_tools/src/http/openapi_spec.yaml b/compute_tools/src/http/openapi_spec.yaml index 2680269756..dc26cc63eb 100644 --- a/compute_tools/src/http/openapi_spec.yaml +++ b/compute_tools/src/http/openapi_spec.yaml @@ -139,6 +139,34 @@ paths: application/json: schema: $ref: "#/components/schemas/GenericError" + /extension_server: + post: + tags: + - Extension + summary: Download extension from S3 to local folder. + description: "" + operationId: downloadExtension + responses: + 200: + description: Extension downloaded + content: + text/plain: + schema: + type: string + description: Error text or 'OK' if download succeeded. + example: "OK" + 400: + description: Request is invalid. + content: + application/json: + schema: + $ref: "#/components/schemas/GenericError" + 500: + description: Extension download request failed. + content: + application/json: + schema: + $ref: "#/components/schemas/GenericError" components: securitySchemes: diff --git a/compute_tools/src/spec.rs b/compute_tools/src/spec.rs index a2a19ae0da..1299727902 100644 --- a/compute_tools/src/spec.rs +++ b/compute_tools/src/spec.rs @@ -124,7 +124,7 @@ pub fn get_spec_from_control_plane( pub fn handle_configuration(spec: &ComputeSpec, pgdata_path: &Path) -> Result<()> { // File `postgresql.conf` is no longer included into `basebackup`, so just // always write all config into it creating new file. - config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), spec)?; + config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), spec, None)?; update_pg_hba(pgdata_path)?; diff --git a/docs/rfcs/024-extension-loading.md b/docs/rfcs/024-extension-loading.md index b7aeaeb2d3..ecd584d8e3 100644 --- a/docs/rfcs/024-extension-loading.md +++ b/docs/rfcs/024-extension-loading.md @@ -207,3 +207,95 @@ Cons: The extension store does not have to be S3 directly, but could be a Node-local caching service on top of S3. This would reduce the load on the network for popular extensions. + +## Extension Store implementation + +Extension Store in our case is a private S3 bucket. +Extensions are stored as tarballs in the bucket. The tarball contains the extension's control file and all the files that the extension needs to run. + +We may also store the control file separately from the tarball to speed up the extension loading. + +`s3:///extensions/ext-name/sha-256+1234abcd1234abcd1234abcd1234abcd/bundle.tar` + +where `ext-name` is an extension name and `sha-256+1234abcd1234abcd1234abcd1234abcd` is a hash of a specific extension version tarball. + +To ensure security, there is no direct access to the S3 bucket from compute node. + +Control plane forms a list of extensions available to the compute node +and forms a short-lived [pre-signed URL](https://docs.aws.amazon.com/AmazonS3/latest/userguide/ShareObjectPreSignedURL.html) +for each extension that is available to the compute node. + +so, `compute_ctl` receives spec in the following format + +``` +"extensions": [{ + "meta_format": 1, + "extension_name": "postgis", + "link": "https:///extensions/sha-256+1234abcd1234abcd1234abcd1234abcd/bundle.tar?AWSAccessKeyId=1234abcd1234abcd1234abcd1234abcd&Expires=1234567890&Signature=1234abcd1234abcd1234abcd1234abcd", + ... +}] +``` + +`compute_ctl` then downloads the extension from the link and unpacks it to the right place. + +### How do we handle private extensions? + +Private and public extensions are treated equally from the Extension Store perspective. +The only difference is that the private extensions are not listed in the user UI (managed by control plane). + +### How to add new extension to the Extension Store? + +Since we need to verify that the extension is compatible with the compute node and doesn't contain any malicious code, +we need to review the extension before adding it to the Extension Store. + +I do not expect that we will have a lot of extensions to review, so we can do it manually for now. + +Some admin UI may be added later to automate this process. + +The list of extensions available to a compute node is stored in the console database. + +### How is the list of available extensions managed? + +We need to add new tables to the console database to store the list of available extensions, their versions and access rights. + +something like this: + +``` +CREATE TABLE extensions ( + id SERIAL PRIMARY KEY, + name VARCHAR(255) NOT NULL, + version VARCHAR(255) NOT NULL, + hash VARCHAR(255) NOT NULL, // this is the path to the extension in the Extension Store + supported_postgres_versions integer[] NOT NULL, + is_public BOOLEAN NOT NULL, // public extensions are available to all users + is_shared_preload BOOLEAN NOT NULL, // these extensions require postgres restart + is_preload BOOLEAN NOT NULL, + license VARCHAR(255) NOT NULL, +); + +CREATE TABLE user_extensions ( + user_id INTEGER NOT NULL, + extension_id INTEGER NOT NULL, + FOREIGN KEY (user_id) REFERENCES users (id), + FOREIGN KEY (extension_id) REFERENCES extensions (id) +); +``` + +When new extension is added to the Extension Store, we add a new record to the table and set permissions. + +In UI, user may select the extensions that they want to use with their compute node. + +NOTE: Extensions that require postgres restart will not be available until the next compute restart. +Also, currently user cannot force postgres restart. We should add this feature later. + +For other extensions, we must communicate updates to `compute_ctl` and they will be downloaded in the background. + +### How can user update the extension? + +User can update the extension by selecting the new version of the extension in the UI. + +### Alternatives + +For extensions written on trusted languages we can also adopt +`dbdev` PostgreSQL Package Manager based on `pg_tle` by Supabase. +This will increase the amount supported extensions and decrease the amount of work required to support them. diff --git a/pgxn/neon/Makefile b/pgxn/neon/Makefile index 1948023472..53917d8bc4 100644 --- a/pgxn/neon/Makefile +++ b/pgxn/neon/Makefile @@ -4,6 +4,7 @@ MODULE_big = neon OBJS = \ $(WIN32RES) \ + extension_server.o \ file_cache.o \ libpagestore.o \ libpqwalproposer.o \ diff --git a/pgxn/neon/extension_server.c b/pgxn/neon/extension_server.c new file mode 100644 index 0000000000..9028ac7218 --- /dev/null +++ b/pgxn/neon/extension_server.c @@ -0,0 +1,94 @@ + +/*------------------------------------------------------------------------- + * + * extension_server.c + * Request compute_ctl to download extension files. + * + * IDENTIFICATION + * contrib/neon/extension_server.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" +#include "tcop/pquery.h" +#include "tcop/utility.h" +#include "access/xact.h" +#include "utils/hsearch.h" +#include "utils/memutils.h" +#include "commands/defrem.h" +#include "miscadmin.h" +#include "utils/acl.h" +#include "fmgr.h" +#include "utils/guc.h" +#include "port.h" +#include "fmgr.h" + +#include + + +static int extension_server_port = 0; + +static download_extension_file_hook_type prev_download_extension_file_hook = NULL; + +// curl -X POST http://localhost:8080/extension_server/postgis-3.so +static bool +neon_download_extension_file_http(const char *filename) +{ + CURL *curl; + CURLcode res; + char * compute_ctl_url; + char *postdata; + bool ret = false; + + if ((curl = curl_easy_init()) == NULL) + { + elog(ERROR, "Failed to initialize curl handle"); + } + + compute_ctl_url = psprintf("http://localhost:%d/extension_server/%s", extension_server_port, filename); + + elog(LOG, "curl_easy_perform() url: %s", compute_ctl_url); + + curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "POST"); + curl_easy_setopt(curl, CURLOPT_URL, compute_ctl_url); + curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3L /* seconds */ ); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, postdata); + + if(curl) + { + /* Perform the request, res will get the return code */ + res = curl_easy_perform(curl); + /* Check for errors */ + if(res == CURLE_OK) + { + elog(LOG, "curl_easy_perform() succeeded"); + ret = true; + } + else + { + elog(WARNING, "curl_easy_perform() failed: %s\n", curl_easy_strerror(res)); + } + + /* always cleanup */ + curl_easy_cleanup(curl); + } + + return ret; +} + +void +pg_init_extension_server() +{ + DefineCustomIntVariable("neon.extension_server_port", + "connection string to the compute_ctl", + NULL, + &extension_server_port, + 0,0,INT_MAX, + PGC_POSTMASTER, + 0, /* no flags required */ + NULL, NULL, NULL); + + //set download_extension_file_hook + prev_download_extension_file_hook = download_extension_file_hook; + download_extension_file_hook = neon_download_extension_file_http; +} diff --git a/pgxn/neon/extension_server.h b/pgxn/neon/extension_server.h new file mode 100644 index 0000000000..8b13789179 --- /dev/null +++ b/pgxn/neon/extension_server.h @@ -0,0 +1 @@ + diff --git a/pgxn/neon/neon.c b/pgxn/neon/neon.c index b45d7cfc32..c7211ea05a 100644 --- a/pgxn/neon/neon.c +++ b/pgxn/neon/neon.c @@ -35,8 +35,11 @@ _PG_init(void) { pg_init_libpagestore(); pg_init_walproposer(); + InitControlPlaneConnector(); + pg_init_extension_server(); + // Important: This must happen after other parts of the extension // are loaded, otherwise any settings to GUCs that were set before // the extension was loaded will be removed. diff --git a/pgxn/neon/neon.h b/pgxn/neon/neon.h index 60d321a945..2610da4311 100644 --- a/pgxn/neon/neon.h +++ b/pgxn/neon/neon.h @@ -21,6 +21,8 @@ extern char *neon_tenant; extern void pg_init_libpagestore(void); extern void pg_init_walproposer(void); +extern void pg_init_extension_server(void); + /* * Returns true if we shouldn't do REDO on that block in record indicated by * block_id; false otherwise. diff --git a/vendor/postgres-v14 b/vendor/postgres-v14 index a2daebc6b4..e638045f47 160000 --- a/vendor/postgres-v14 +++ b/vendor/postgres-v14 @@ -1 +1 @@ -Subproject commit a2daebc6b445dcbcca9c18e1711f47c1db7ffb04 +Subproject commit e638045f47cc2e5dca8d2bd721281085b1d3eb8f diff --git a/vendor/postgres-v15 b/vendor/postgres-v15 index 2df2ce3744..21eebac3d5 160000 --- a/vendor/postgres-v15 +++ b/vendor/postgres-v15 @@ -1 +1 @@ -Subproject commit 2df2ce374464a7449e15dfa46c956b73b4f4098b +Subproject commit 21eebac3d507e6e55fabf6ef057662dea90b000a