Start working on api

This commit is contained in:
Bojan Serafimov
2022-06-08 11:41:30 -04:00
parent a39501beee
commit db4be89577
6 changed files with 93 additions and 1 deletions

9
Cargo.lock generated
View File

@@ -55,6 +55,14 @@ dependencies = [
"backtrace",
]
[[package]]
name = "apis"
version = "0.1.0"
dependencies = [
"bytes",
"serde",
]
[[package]]
name = "arrayvec"
version = "0.4.12"
@@ -1796,6 +1804,7 @@ name = "pageserver"
version = "0.1.0"
dependencies = [
"anyhow",
"apis",
"byteorder",
"bytes",
"chrono",

10
libs/apis/Cargo.toml Normal file
View File

@@ -0,0 +1,10 @@
[package]
name = "apis"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
bytes = { version = "1.0.1", features = ['serde'] }
serde = { version = "1.0", features = ["derive"] }

15
libs/apis/src/import.rs Normal file
View File

@@ -0,0 +1,15 @@
use bytes::Bytes;
use serde::{Serialize, Deserialize};
#[derive(Serialize, Deserialize)]
pub enum ImportFeMessage {
// TODO chunk it
File(String, Bytes),
Done
}
#[derive(Serialize, Deserialize)]
pub enum ImportBeMessage {
Done,
Error(String),
}

1
libs/apis/src/lib.rs Normal file
View File

@@ -0,0 +1 @@
pub mod import;

View File

@@ -58,6 +58,7 @@ postgres_ffi = { path = "../libs/postgres_ffi" }
etcd_broker = { path = "../libs/etcd_broker" }
metrics = { path = "../libs/metrics" }
utils = { path = "../libs/utils" }
apis = { path = "../libs/apis" }
remote_storage = { path = "../libs/remote_storage" }
workspace_hack = { version = "0.1", path = "../workspace_hack" }
close_fds = "0.3.2"

View File

@@ -26,6 +26,8 @@ use utils::{
pq_proto::{BeMessage, FeMessage, RowDescriptor, SINGLE_COL_ROWDESC},
zid::{ZTenantId, ZTimelineId},
};
use apis::import::{ImportFeMessage, ImportBeMessage};
use utils::bin_ser::BeSer;
use crate::basebackup;
use crate::config::{PageServerConf, ProfilingConfig};
@@ -447,6 +449,58 @@ impl PageServerHandler {
Ok(())
}
fn handle_import(
&self,
pgb: &mut PostgresBackend,
tenant: ZTenantId,
timeline: ZTimelineId,
) -> anyhow::Result<()> {
let _enter = info_span!("import", timeline = %timeline, tenant = %tenant).entered();
// TODO cap the max number of import threads allowed
// TODO check that timeline doesn't exist already
// NOTE: pagerequests handler exits when connection is closed,
// so there is no need to reset the association
thread_mgr::associate_with(Some(tenant), Some(timeline));
// switch client to COPYBOTH
pgb.write_message(&BeMessage::CopyBothResponse)?;
while !thread_mgr::is_shutdown_requested() {
match pgb.read_message() {
Ok(Some(message)) => {
let copy_data_bytes = match message {
FeMessage::CopyData(bytes) => bytes,
_ => continue, // TODO maybe error?
};
let fe_msg = ImportFeMessage::des(&copy_data_bytes)?;
match fe_msg {
ImportFeMessage::File(name, _file_bytes) => {
info!("Got file {}", name);
// TODO chunk (what does pg_basebackup do?)
// TODO handle
}
ImportFeMessage::Done => {
info!("Done importing.");
pgb.write_message(&BeMessage::CopyData(&ImportBeMessage::Done.ser()?))?;
return Ok(())
}
}
},
Ok(None) => {
todo!("error on none?");
},
Err(e) => {
todo!("import error handler not implemented {}", e);
},
}
}
Ok(())
}
/// Helper function to handle the LSN from client request.
///
/// Each GetPage (and Exists and Nblocks) request includes information about
@@ -731,8 +785,10 @@ impl postgres_backend::Handler for PageServerHandler {
let timeline = ZTimelineId::from_str(params[1])?;
info!("Importing timeline {}.{}", tenant, timeline);
todo!("timeline import not implemented in pageserver");
self.handle_import(pgb, tenant, timeline)?;
info!("Done importing timeline {}.{}", tenant, timeline);
// Do I need this?
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.to_ascii_lowercase().starts_with("set ") {
// important because psycopg2 executes "SET datestyle TO 'ISO'"