mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-13 00:12:54 +00:00
fmt + clippy
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
use std::collections::HashMap;
|
||||
use std::fs::File;
|
||||
use std::io::{BufReader, Read, Write};
|
||||
use std::io::{BufReader, Write};
|
||||
use std::net::TcpStream;
|
||||
use std::num::NonZeroU64;
|
||||
use std::path::PathBuf;
|
||||
@@ -553,7 +553,8 @@ impl PageServerNode {
|
||||
};
|
||||
|
||||
// Import base
|
||||
let import_cmd = format!("import basebackup {tenant_id} {timeline_id} {start_lsn} {end_lsn}");
|
||||
let import_cmd =
|
||||
format!("import basebackup {tenant_id} {timeline_id} {start_lsn} {end_lsn}");
|
||||
let mut writer = client.copy_in(&import_cmd)?;
|
||||
io::copy(&mut base_reader, &mut writer)?;
|
||||
writer.finish()?;
|
||||
|
||||
@@ -626,26 +626,32 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
|
||||
"Created timeline '{}' at Lsn {} for tenant: {}",
|
||||
timeline.timeline_id, last_record_lsn, tenant_id,
|
||||
);
|
||||
},
|
||||
}
|
||||
Some(("import", import_match)) => {
|
||||
let tenant_id = get_tenant_id(import_match, env)?;
|
||||
let timeline_id = parse_timeline_id(import_match)?
|
||||
.expect("No timeline id provided");
|
||||
let name = import_match.value_of("node-name")
|
||||
let timeline_id = parse_timeline_id(import_match)?.expect("No timeline id provided");
|
||||
let name = import_match
|
||||
.value_of("node-name")
|
||||
.ok_or_else(|| anyhow!("No node name provided"))?;
|
||||
|
||||
// Parse base inputs
|
||||
let base_tarfile = import_match.value_of("base-tarfile")
|
||||
let base_tarfile = import_match
|
||||
.value_of("base-tarfile")
|
||||
.map(|s| PathBuf::from_str(s).unwrap())
|
||||
.ok_or_else(|| anyhow!("No base-tarfile provided"))?;
|
||||
let base_lsn = Lsn::from_str(import_match.value_of("base-lsn")
|
||||
.ok_or_else(|| anyhow!("No base-lsn provided"))?)?;
|
||||
let base_lsn = Lsn::from_str(
|
||||
import_match
|
||||
.value_of("base-lsn")
|
||||
.ok_or_else(|| anyhow!("No base-lsn provided"))?,
|
||||
)?;
|
||||
let base = (base_lsn, base_tarfile);
|
||||
|
||||
// Parse pg_wal inputs
|
||||
let wal_tarfile = import_match.value_of("wal-tarfile")
|
||||
let wal_tarfile = import_match
|
||||
.value_of("wal-tarfile")
|
||||
.map(|s| PathBuf::from_str(s).unwrap());
|
||||
let end_lsn = import_match.value_of("end-lsn")
|
||||
let end_lsn = import_match
|
||||
.value_of("end-lsn")
|
||||
.map(|s| Lsn::from_str(s).unwrap());
|
||||
// TODO validate both or none are provided
|
||||
let pg_wal = end_lsn.zip(wal_tarfile);
|
||||
@@ -657,7 +663,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
|
||||
env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?;
|
||||
cplane.new_node(tenant_id, name, timeline_id, None, None)?;
|
||||
println!("Done");
|
||||
},
|
||||
}
|
||||
Some(("branch", branch_match)) => {
|
||||
let tenant_id = get_tenant_id(branch_match, env)?;
|
||||
let new_branch_name = branch_match
|
||||
|
||||
@@ -6,10 +6,9 @@ use std::fs;
|
||||
use std::fs::File;
|
||||
use std::io::{Read, Seek, SeekFrom};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::str::FromStr;
|
||||
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
use bytes::{Buf, Bytes};
|
||||
use bytes::Bytes;
|
||||
use tracing::*;
|
||||
|
||||
use crate::pgdatadir_mapping::*;
|
||||
@@ -159,7 +158,6 @@ pub fn import_timeline_from_postgres_datadir<R: Repository>(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
fn import_relfile<R: Repository>(
|
||||
modification: &mut DatadirModification<R>,
|
||||
path: &Path,
|
||||
@@ -429,7 +427,6 @@ fn import_wal<R: Repository>(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
pub fn import_basebackup_from_tar<R: Repository, Reader: Read>(
|
||||
tline: &mut DatadirTimeline<R>,
|
||||
reader: Reader,
|
||||
@@ -441,7 +438,7 @@ pub fn import_basebackup_from_tar<R: Repository, Reader: Read>(
|
||||
|
||||
// Import base
|
||||
for base_tar_entry in tar::Archive::new(reader).entries()? {
|
||||
let mut entry = base_tar_entry.unwrap();
|
||||
let entry = base_tar_entry.unwrap();
|
||||
let header = entry.header();
|
||||
let len = header.entry_size()? as usize;
|
||||
let file_path = header.path().unwrap().into_owned();
|
||||
@@ -451,17 +448,17 @@ pub fn import_basebackup_from_tar<R: Repository, Reader: Read>(
|
||||
// let mut buffer = Vec::new();
|
||||
// entry.read_to_end(&mut buffer).unwrap();
|
||||
|
||||
import_file(&mut modification, &file_path.as_ref(), entry, len)?;
|
||||
},
|
||||
import_file(&mut modification, file_path.as_ref(), entry, len)?;
|
||||
}
|
||||
tar::EntryType::Directory => {
|
||||
info!("directory {:?}", file_path);
|
||||
if file_path.starts_with("pg_wal") {
|
||||
info!("found pg_wal in base lol");
|
||||
}
|
||||
},
|
||||
}
|
||||
_ => {
|
||||
panic!("tar::EntryType::?? {}", file_path.display());
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -475,7 +472,6 @@ pub fn import_wal_from_tar<R: Repository, Reader: Read>(
|
||||
start_lsn: Lsn,
|
||||
end_lsn: Lsn,
|
||||
) -> Result<()> {
|
||||
|
||||
// Set up walingest mutable state
|
||||
let mut waldecoder = WalStreamDecoder::new(start_lsn);
|
||||
let mut segno = start_lsn.segment_number(pg_constants::WAL_SEGMENT_SIZE);
|
||||
@@ -579,7 +575,7 @@ pub fn import_file<R: Repository, Reader: Read>(
|
||||
}
|
||||
"PG_VERSION" => {
|
||||
info!("ignored");
|
||||
},
|
||||
}
|
||||
_ => {
|
||||
import_rel(modification, file_path, spcnode, dbnode, reader, len)?;
|
||||
info!("imported rel creation");
|
||||
@@ -589,8 +585,7 @@ pub fn import_file<R: Repository, Reader: Read>(
|
||||
let spcnode = pg_constants::DEFAULTTABLESPACE_OID;
|
||||
let dbnode: u32 = file_path
|
||||
.iter()
|
||||
.skip(1)
|
||||
.next()
|
||||
.nth(1)
|
||||
.unwrap()
|
||||
.to_string_lossy()
|
||||
.parse()
|
||||
@@ -604,7 +599,7 @@ pub fn import_file<R: Repository, Reader: Read>(
|
||||
}
|
||||
"PG_VERSION" => {
|
||||
info!("ignored");
|
||||
},
|
||||
}
|
||||
_ => {
|
||||
import_rel(modification, file_path, spcnode, dbnode, reader, len)?;
|
||||
info!("imported rel creation");
|
||||
|
||||
@@ -13,10 +13,8 @@ use anyhow::{bail, ensure, Context, Result};
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
use lazy_static::lazy_static;
|
||||
use regex::Regex;
|
||||
use std::collections::VecDeque;
|
||||
use std::io::{self, Read};
|
||||
use std::net::TcpListener;
|
||||
use std::path::PathBuf;
|
||||
use std::str;
|
||||
use std::str::FromStr;
|
||||
use std::sync::{Arc, RwLockReadGuard};
|
||||
@@ -31,7 +29,7 @@ use utils::{
|
||||
|
||||
use crate::basebackup;
|
||||
use crate::config::{PageServerConf, ProfilingConfig};
|
||||
use crate::import_datadir::{import_basebackup_from_tar, import_timeline_from_postgres_datadir, import_wal_from_tar};
|
||||
use crate::import_datadir::{import_basebackup_from_tar, import_wal_from_tar};
|
||||
use crate::layered_repository::LayeredRepository;
|
||||
use crate::pgdatadir_mapping::{DatadirTimeline, LsnForTimestamp};
|
||||
use crate::profiling::profpoint_start;
|
||||
@@ -42,7 +40,6 @@ use crate::tenant_mgr;
|
||||
use crate::thread_mgr;
|
||||
use crate::thread_mgr::ThreadKind;
|
||||
use crate::CheckpointConfig;
|
||||
use crate::timelines::create_timeline;
|
||||
use metrics::{register_histogram_vec, HistogramVec};
|
||||
use postgres_ffi::xlog_utils::to_pg_timestamp;
|
||||
|
||||
@@ -211,7 +208,14 @@ struct CopyInReader<'a> {
|
||||
|
||||
/// Overflow buffer for bytes sent in CopyData messages
|
||||
/// that the reader (caller of read) hasn't asked for yet.
|
||||
buf: VecDeque<u8>,
|
||||
/// TODO use BytesMut?
|
||||
buf: Vec<u8>,
|
||||
|
||||
/// Bytes before `buf_begin` are considered as dropped.
|
||||
/// This allows us to implement O(1) pop_front on Vec<u8>.
|
||||
/// The Vec won't grow large because we only add to it
|
||||
/// when it's empty.
|
||||
buf_begin: usize,
|
||||
}
|
||||
|
||||
impl<'a> CopyInReader<'a> {
|
||||
@@ -219,7 +223,8 @@ impl<'a> CopyInReader<'a> {
|
||||
fn new(pgb: &'a mut PostgresBackend) -> Self {
|
||||
Self {
|
||||
pgb,
|
||||
buf: VecDeque::<_>::new(),
|
||||
buf: Vec::<_>::new(),
|
||||
buf_begin: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -239,44 +244,44 @@ impl<'a> Read for CopyInReader<'a> {
|
||||
// TODO check if shutdown was requested?
|
||||
|
||||
// Return from buffer if nonempty
|
||||
if self.buf.len() > 0 {
|
||||
let bytes_read = std::cmp::min(buf.len(), self.buf.len());
|
||||
for i in 0..bytes_read {
|
||||
buf[i] = self.buf[i];
|
||||
}
|
||||
self.buf.drain(0..bytes_read);
|
||||
return Ok(bytes_read);
|
||||
if self.buf_begin < self.buf.len() {
|
||||
let bytes_to_read = std::cmp::min(buf.len(), self.buf.len() - self.buf_begin);
|
||||
buf[..bytes_to_read].copy_from_slice(&self.buf[self.buf_begin..][..bytes_to_read]);
|
||||
self.buf_begin += bytes_to_read;
|
||||
return Ok(bytes_to_read);
|
||||
}
|
||||
|
||||
// Delete garbage
|
||||
self.buf.clear();
|
||||
self.buf_begin = 0;
|
||||
|
||||
// Wait for client to send CopyData bytes
|
||||
match self.pgb.read_message() {
|
||||
Ok(Some(message)) => {
|
||||
let copy_data_bytes = match message {
|
||||
FeMessage::CopyData(bytes) => bytes,
|
||||
FeMessage::CopyDone => {
|
||||
return Ok(0)
|
||||
}
|
||||
FeMessage::CopyDone => return Ok(0),
|
||||
m => {
|
||||
info!("unexpected copy in client message {:?}", m);
|
||||
continue;
|
||||
},
|
||||
}
|
||||
};
|
||||
|
||||
// Return as much as we can, saving the rest in self.buf
|
||||
let mut reader = copy_data_bytes.reader();
|
||||
let bytes_read = reader.read(buf)?;
|
||||
self.buf.extend(reader.bytes().map(|r| r.expect("error reading from copy in")));
|
||||
reader.read_to_end(&mut self.buf)?;
|
||||
return Ok(bytes_read);
|
||||
},
|
||||
}
|
||||
Ok(None) => {
|
||||
// Is this ok?
|
||||
return Ok(0)
|
||||
},
|
||||
return Ok(0);
|
||||
}
|
||||
Err(e) => {
|
||||
if !is_socket_read_timed_out(&e) {
|
||||
todo!("return io::Error");
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -535,20 +540,21 @@ impl PageServerHandler {
|
||||
tenant_id: ZTenantId,
|
||||
timeline_id: ZTimelineId,
|
||||
base_lsn: Lsn,
|
||||
end_lsn: Lsn,
|
||||
_end_lsn: Lsn,
|
||||
) -> anyhow::Result<()> {
|
||||
let _enter = info_span!("import basebackup", timeline = %timeline_id, tenant = %tenant_id).entered();
|
||||
let _enter =
|
||||
info_span!("import basebackup", timeline = %timeline_id, tenant = %tenant_id).entered();
|
||||
// TODO thread_mgr::associate_with?
|
||||
|
||||
// Create empty timeline
|
||||
info!("creating new timeline");
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?;
|
||||
let timeline = repo.create_empty_timeline(timeline_id, Lsn(0))?;
|
||||
let repartition_distance = repo.get_checkpoint_distance(); // TODO
|
||||
let mut datadir_timeline = DatadirTimeline::<LayeredRepository>::new(
|
||||
timeline, repartition_distance);
|
||||
let repartition_distance = repo.get_checkpoint_distance(); // TODO
|
||||
let mut datadir_timeline =
|
||||
DatadirTimeline::<LayeredRepository>::new(timeline, repartition_distance);
|
||||
|
||||
// TODO mark timeline as not ready until it reaches end_lsn
|
||||
// TODO mark timeline as not ready until it reaches end_lsn?
|
||||
|
||||
// Import basebackup provided via CopyData
|
||||
info!("importing basebackup");
|
||||
@@ -568,14 +574,15 @@ impl PageServerHandler {
|
||||
start_lsn: Lsn,
|
||||
end_lsn: Lsn,
|
||||
) -> anyhow::Result<()> {
|
||||
let _enter = info_span!("import wal", timeline = %timeline_id, tenant = %tenant_id).entered();
|
||||
let _enter =
|
||||
info_span!("import wal", timeline = %timeline_id, tenant = %tenant_id).entered();
|
||||
// TODO thread_mgr::associate_with?
|
||||
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?;
|
||||
let timeline = repo.get_timeline_load(timeline_id)?;
|
||||
let repartition_distance = repo.get_checkpoint_distance(); // TODO
|
||||
let mut datadir_timeline = DatadirTimeline::<LayeredRepository>::new(
|
||||
timeline, repartition_distance);
|
||||
let repartition_distance = repo.get_checkpoint_distance(); // TODO
|
||||
let mut datadir_timeline =
|
||||
DatadirTimeline::<LayeredRepository>::new(timeline, repartition_distance);
|
||||
|
||||
// TODO ensure start_lsn matches current lsn
|
||||
|
||||
|
||||
Reference in New Issue
Block a user