diff --git a/Cargo.lock b/Cargo.lock index c615766eb8..ddb1267691 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -55,6 +55,14 @@ dependencies = [ "backtrace", ] +[[package]] +name = "apis" +version = "0.1.0" +dependencies = [ + "bytes", + "serde", +] + [[package]] name = "arrayvec" version = "0.4.12" @@ -1796,6 +1804,7 @@ name = "pageserver" version = "0.1.0" dependencies = [ "anyhow", + "apis", "byteorder", "bytes", "chrono", diff --git a/libs/apis/Cargo.toml b/libs/apis/Cargo.toml new file mode 100644 index 0000000000..deae4c4a1e --- /dev/null +++ b/libs/apis/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "apis" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +bytes = { version = "1.0.1", features = ['serde'] } +serde = { version = "1.0", features = ["derive"] } diff --git a/libs/apis/src/import.rs b/libs/apis/src/import.rs new file mode 100644 index 0000000000..ba84ef85a1 --- /dev/null +++ b/libs/apis/src/import.rs @@ -0,0 +1,15 @@ +use bytes::Bytes; +use serde::{Serialize, Deserialize}; + +#[derive(Serialize, Deserialize)] +pub enum ImportFeMessage { + // TODO chunk it + File(String, Bytes), + Done +} + +#[derive(Serialize, Deserialize)] +pub enum ImportBeMessage { + Done, + Error(String), +} diff --git a/libs/apis/src/lib.rs b/libs/apis/src/lib.rs new file mode 100644 index 0000000000..07fecca158 --- /dev/null +++ b/libs/apis/src/lib.rs @@ -0,0 +1 @@ +pub mod import; diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 298addb838..73542dc61b 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -58,6 +58,7 @@ postgres_ffi = { path = "../libs/postgres_ffi" } etcd_broker = { path = "../libs/etcd_broker" } metrics = { path = "../libs/metrics" } utils = { path = "../libs/utils" } +apis = { path = "../libs/apis" } remote_storage = { path = "../libs/remote_storage" } workspace_hack = { version = "0.1", path = "../workspace_hack" } close_fds = "0.3.2" diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 6ab9b58796..e0baf5293a 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -26,6 +26,8 @@ use utils::{ pq_proto::{BeMessage, FeMessage, RowDescriptor, SINGLE_COL_ROWDESC}, zid::{ZTenantId, ZTimelineId}, }; +use apis::import::{ImportFeMessage, ImportBeMessage}; +use utils::bin_ser::BeSer; use crate::basebackup; use crate::config::{PageServerConf, ProfilingConfig}; @@ -447,6 +449,58 @@ impl PageServerHandler { Ok(()) } + fn handle_import( + &self, + pgb: &mut PostgresBackend, + tenant: ZTenantId, + timeline: ZTimelineId, + ) -> anyhow::Result<()> { + let _enter = info_span!("import", timeline = %timeline, tenant = %tenant).entered(); + // TODO cap the max number of import threads allowed + // TODO check that timeline doesn't exist already + + // NOTE: pagerequests handler exits when connection is closed, + // so there is no need to reset the association + thread_mgr::associate_with(Some(tenant), Some(timeline)); + + // switch client to COPYBOTH + pgb.write_message(&BeMessage::CopyBothResponse)?; + + while !thread_mgr::is_shutdown_requested() { + match pgb.read_message() { + Ok(Some(message)) => { + let copy_data_bytes = match message { + FeMessage::CopyData(bytes) => bytes, + _ => continue, // TODO maybe error? + }; + + let fe_msg = ImportFeMessage::des(©_data_bytes)?; + match fe_msg { + ImportFeMessage::File(name, _file_bytes) => { + info!("Got file {}", name); + // TODO chunk (what does pg_basebackup do?) + // TODO handle + } + ImportFeMessage::Done => { + info!("Done importing."); + pgb.write_message(&BeMessage::CopyData(&ImportBeMessage::Done.ser()?))?; + return Ok(()) + } + } + + }, + Ok(None) => { + todo!("error on none?"); + }, + Err(e) => { + todo!("import error handler not implemented {}", e); + }, + } + } + + Ok(()) + } + /// Helper function to handle the LSN from client request. /// /// Each GetPage (and Exists and Nblocks) request includes information about @@ -731,8 +785,10 @@ impl postgres_backend::Handler for PageServerHandler { let timeline = ZTimelineId::from_str(params[1])?; info!("Importing timeline {}.{}", tenant, timeline); - todo!("timeline import not implemented in pageserver"); + self.handle_import(pgb, tenant, timeline)?; + info!("Done importing timeline {}.{}", tenant, timeline); + // Do I need this? pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; } else if query_string.to_ascii_lowercase().starts_with("set ") { // important because psycopg2 executes "SET datestyle TO 'ISO'"