Handle shutdown

This commit is contained in:
Bojan Serafimov
2022-06-12 12:37:52 -04:00
parent cd081280bf
commit 23d4f12cdd

View File

@@ -240,9 +240,7 @@ impl<'a> Drop for CopyInReader<'a> {
impl<'a> Read for CopyInReader<'a> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
loop {
// TODO check if shutdown was requested?
while !thread_mgr::is_shutdown_requested() {
// Return from buffer if nonempty
if self.buf_begin < self.buf.len() {
let bytes_to_read = std::cmp::min(buf.len(), self.buf.len() - self.buf_begin);
@@ -287,6 +285,10 @@ impl<'a> Read for CopyInReader<'a> {
}
}
}
// Shutting down
// TODO is this fine?
Ok(0)
}
}
@@ -545,19 +547,22 @@ impl PageServerHandler {
base_lsn: Lsn,
_end_lsn: Lsn,
) -> anyhow::Result<()> {
thread_mgr::associate_with(Some(tenant_id), Some(timeline_id));
let _enter =
info_span!("import basebackup", timeline = %timeline_id, tenant = %tenant_id).entered();
// TODO thread_mgr::associate_with?
// Create empty timeline
info!("creating new timeline");
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?;
let timeline = repo.create_empty_timeline(timeline_id, Lsn(0))?;
let repartition_distance = repo.get_checkpoint_distance(); // TODO
let repartition_distance = repo.get_checkpoint_distance();
let mut datadir_timeline =
DatadirTimeline::<LayeredRepository>::new(timeline, repartition_distance);
// TODO mark timeline as not ready until it reaches end_lsn?
// This would prevent compute node from connecting to it and writing conflicting wal.
// TODO leave clean state on error
// Import basebackup provided via CopyData
info!("importing basebackup");
@@ -577,18 +582,20 @@ impl PageServerHandler {
start_lsn: Lsn,
end_lsn: Lsn,
) -> anyhow::Result<()> {
thread_mgr::associate_with(Some(tenant_id), Some(timeline_id));
let _enter =
info_span!("import wal", timeline = %timeline_id, tenant = %tenant_id).entered();
// TODO thread_mgr::associate_with?
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?;
let timeline = repo.get_timeline_load(timeline_id)?;
let repartition_distance = repo.get_checkpoint_distance(); // TODO
let repartition_distance = repo.get_checkpoint_distance();
let mut datadir_timeline =
DatadirTimeline::<LayeredRepository>::new(timeline, repartition_distance);
// TODO ensure start_lsn matches current lsn
// TODO leave clean state on error
// Import wal provided via CopyData
info!("importing wal");
pgb.write_message(&BeMessage::CopyInResponse)?;
@@ -876,6 +883,7 @@ impl postgres_backend::Handler for PageServerHandler {
self.handle_basebackup_request(pgb, timelineid, lsn, tenantid)?;
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("import basebackup ") {
// Import the `base` section (everything but the wal) of a basebackup.
let (_, params_raw) = query_string.split_at("import basebackup ".len());
let params = params_raw.split_whitespace().collect::<Vec<_>>();
ensure!(params.len() == 4);
@@ -889,6 +897,7 @@ impl postgres_backend::Handler for PageServerHandler {
self.handle_import_basebackup(pgb, tenant, timeline, base_lsn, end_lsn)?;
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("import wal ") {
// Import the `pg_wal` section of a basebackup.
let (_, params_raw) = query_string.split_at("import wal ".len());
let params = params_raw.split_whitespace().collect::<Vec<_>>();
ensure!(params.len() == 4);