Fix merge bugs

This commit is contained in:
Bojan Serafimov
2022-09-15 20:38:09 -04:00
parent 6913bedc09
commit a66a3b4ed8
3 changed files with 20 additions and 15 deletions

View File

@@ -10,8 +10,8 @@ use tokio::{io::AsyncWriteExt, net::TcpStream};
use clap::{App, Arg};
use utils::{
id::{TenantId, TimelineId},
pq_proto::{BeMessage, FeMessage},
zid::{ZTenantId, ZTimelineId},
};
// TODO put this in library, dedup with stuff in control_plane
@@ -23,8 +23,8 @@ struct PagestreamApi {
impl PagestreamApi {
async fn connect(
connstr: &str,
tenant: &ZTenantId,
timeline: &ZTimelineId,
tenant: &TenantId,
timeline: &TimelineId,
) -> anyhow::Result<PagestreamApi> {
// Parse connstr
let config = tokio_postgres::Config::from_str(connstr).expect("bad connstr");
@@ -104,17 +104,17 @@ async fn main() -> anyhow::Result<()> {
for tenant_dir in read_dir(traces_dir)? {
let entry = tenant_dir?;
let path = entry.path();
let tenant_id = ZTenantId::from_str(path.file_name().unwrap().to_str().unwrap())?;
let tenant_id = TenantId::from_str(path.file_name().unwrap().to_str().unwrap())?;
for timeline_dir in read_dir(path)? {
let entry = timeline_dir?;
let path = entry.path();
let timeline_id = ZTimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
let timeline_id = TimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
for trace_dir in read_dir(path)? {
let entry = trace_dir?;
let path = entry.path();
let _conn_id = ZTimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
let _conn_id = TimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
// TODO The pageserver deletes existing traces?
// LOL yes because I use tenant ID as trace id

View File

@@ -360,9 +360,9 @@ impl PageServerConf {
pub fn trace_path(
&self,
tenant_id: &ZTenantId,
timeline_id: &ZTimelineId,
connection_id: &ZTimelineId, // TODO make a new type
tenant_id: &TenantId,
timeline_id: &TimelineId,
connection_id: &TimelineId, // TODO make a new type
) -> PathBuf {
self.traces_path()
.join(tenant_id.to_string())

View File

@@ -11,7 +11,8 @@
use anyhow::{bail, ensure, Context, Result};
use byteorder::{BigEndian, ReadBytesExt};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use bytes::Buf;
use bytes::{BufMut, Bytes, BytesMut};
use futures::{Stream, StreamExt};
use regex::Regex;
use std::io;
@@ -473,11 +474,11 @@ impl PageServerHandler {
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
// Make request tracer if needed
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?;
let mut tracer = if repo.get_trace_read_requests() {
let path = repo
let tenant = tenant_mgr::get_tenant(tenant_id, true)?;
let mut tracer = if tenant.get_trace_read_requests() {
let path = tenant
.conf
.trace_path(&tenant_id, &timeline_id, &ZTimelineId::generate());
.trace_path(&tenant_id, &timeline_id, &TimelineId::generate());
Some(Tracer::new(path))
} else {
None
@@ -507,6 +508,10 @@ impl PageServerHandler {
let copy_data_bytes = match msg? {
Some(FeMessage::CopyData(bytes)) => bytes,
Some(FeMessage::Sync) => {
// TODO what now?
continue;
}
Some(m) => {
bail!("unexpected message: {m:?} during COPY");
}
@@ -520,7 +525,7 @@ impl PageServerHandler {
trace!("query: {copy_data_bytes:?}");
let neon_fe_msg = PagestreamFeMessage::parse(copy_data_bytes)?;
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
let response = match neon_fe_msg {
PagestreamFeMessage::Exists(req) => {