diff --git a/pageserver/src/bin/replay.rs b/pageserver/src/bin/replay.rs index fe714f5788..c1ef7f82af 100644 --- a/pageserver/src/bin/replay.rs +++ b/pageserver/src/bin/replay.rs @@ -10,8 +10,8 @@ use tokio::{io::AsyncWriteExt, net::TcpStream}; use clap::{App, Arg}; use utils::{ + id::{TenantId, TimelineId}, pq_proto::{BeMessage, FeMessage}, - zid::{ZTenantId, ZTimelineId}, }; // TODO put this in library, dedup with stuff in control_plane @@ -23,8 +23,8 @@ struct PagestreamApi { impl PagestreamApi { async fn connect( connstr: &str, - tenant: &ZTenantId, - timeline: &ZTimelineId, + tenant: &TenantId, + timeline: &TimelineId, ) -> anyhow::Result { // Parse connstr let config = tokio_postgres::Config::from_str(connstr).expect("bad connstr"); @@ -104,17 +104,17 @@ async fn main() -> anyhow::Result<()> { for tenant_dir in read_dir(traces_dir)? { let entry = tenant_dir?; let path = entry.path(); - let tenant_id = ZTenantId::from_str(path.file_name().unwrap().to_str().unwrap())?; + let tenant_id = TenantId::from_str(path.file_name().unwrap().to_str().unwrap())?; for timeline_dir in read_dir(path)? { let entry = timeline_dir?; let path = entry.path(); - let timeline_id = ZTimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?; + let timeline_id = TimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?; for trace_dir in read_dir(path)? { let entry = trace_dir?; let path = entry.path(); - let _conn_id = ZTimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?; + let _conn_id = TimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?; // TODO The pageserver deletes existing traces? // LOL yes because I use tenant ID as trace id diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 45ad81ec22..0d1c064290 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -360,9 +360,9 @@ impl PageServerConf { pub fn trace_path( &self, - tenant_id: &ZTenantId, - timeline_id: &ZTimelineId, - connection_id: &ZTimelineId, // TODO make a new type + tenant_id: &TenantId, + timeline_id: &TimelineId, + connection_id: &TimelineId, // TODO make a new type ) -> PathBuf { self.traces_path() .join(tenant_id.to_string()) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index cb0963b188..be9dc28ed0 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -11,7 +11,8 @@ use anyhow::{bail, ensure, Context, Result}; use byteorder::{BigEndian, ReadBytesExt}; -use bytes::{Buf, BufMut, Bytes, BytesMut}; +use bytes::Buf; +use bytes::{BufMut, Bytes, BytesMut}; use futures::{Stream, StreamExt}; use regex::Regex; use std::io; @@ -473,11 +474,11 @@ impl PageServerHandler { task_mgr::associate_with(Some(tenant_id), Some(timeline_id)); // Make request tracer if needed - let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?; - let mut tracer = if repo.get_trace_read_requests() { - let path = repo + let tenant = tenant_mgr::get_tenant(tenant_id, true)?; + let mut tracer = if tenant.get_trace_read_requests() { + let path = tenant .conf - .trace_path(&tenant_id, &timeline_id, &ZTimelineId::generate()); + .trace_path(&tenant_id, &timeline_id, &TimelineId::generate()); Some(Tracer::new(path)) } else { None @@ -507,6 +508,10 @@ impl PageServerHandler { let copy_data_bytes = match msg? { Some(FeMessage::CopyData(bytes)) => bytes, + Some(FeMessage::Sync) => { + // TODO what now? + continue; + } Some(m) => { bail!("unexpected message: {m:?} during COPY"); } @@ -520,7 +525,7 @@ impl PageServerHandler { trace!("query: {copy_data_bytes:?}"); - let neon_fe_msg = PagestreamFeMessage::parse(copy_data_bytes)?; + let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?; let response = match neon_fe_msg { PagestreamFeMessage::Exists(req) => {