feat: add bulk to wal Mutation

This commit is contained in:
evenyag
2025-02-03 23:23:17 +08:00
parent 7170120de6
commit 094d0fcdf5
13 changed files with 31 additions and 2 deletions

View File

@@ -276,6 +276,7 @@ impl CpuDataGenerator {
rows,
}),
write_hint: None,
bulk: Vec::new(),
};
KeyValues::new(&self.metadata, mutation).unwrap()

View File

@@ -55,7 +55,7 @@ use crate::sst::to_sst_arrow_schema;
#[derive(Debug)]
pub struct BulkPart {
data: Bytes,
pub(crate) data: Bytes,
metadata: BulkPartMeta,
}

View File

@@ -394,6 +394,7 @@ mod tests {
sequence: START_SEQ,
rows: Some(rows),
write_hint: None,
bulk: Vec::new(),
}
}
@@ -432,6 +433,7 @@ mod tests {
sequence: 100,
rows: None,
write_hint: None,
bulk: Vec::new(),
};
let kvs = KeyValues::new(&meta, mutation);
assert!(kvs.is_none());

View File

@@ -731,6 +731,7 @@ mod tests {
rows,
}),
write_hint: None,
bulk: Vec::new(),
};
KeyValues::new(metadata.as_ref(), mutation).unwrap()
}

View File

@@ -1186,6 +1186,7 @@ mod tests {
rows,
}),
write_hint: None,
bulk: Vec::new(),
};
KeyValues::new(schema.as_ref(), mutation).unwrap()
}

View File

@@ -544,10 +544,12 @@ where
.as_ref()
.map(|rows| rows.rows.len())
.unwrap_or(0);
// TODO(yingwen): We need to support schema change as bulk may have different schema.
region_write_ctx.push_mutation(
mutation.op_type,
mutation.rows,
mutation.write_hint,
mutation.bulk,
OptionOutputTx::none(),
);
}

View File

@@ -136,6 +136,7 @@ impl RegionWriteCtx {
op_type: i32,
rows: Option<Rows>,
write_hint: Option<WriteHint>,
bulk: Vec<u8>,
tx: OptionOutputTx,
) {
let num_rows = rows.as_ref().map(|rows| rows.rows.len()).unwrap_or(0);
@@ -144,6 +145,7 @@ impl RegionWriteCtx {
sequence: self.next_sequence,
rows,
write_hint,
bulk,
});
let notify = WriteNotify::new(tx, num_rows);

View File

@@ -290,6 +290,7 @@ pub(crate) fn build_key_values_with_ts_seq_values(
rows,
}),
write_hint: None,
bulk: Vec::new(),
};
KeyValues::new(metadata.as_ref(), mutation).unwrap()
}

View File

@@ -166,6 +166,7 @@ pub(crate) fn write_rows_to_version(
sequence: start_ts as u64, // The sequence may be incorrect, but it's fine in test.
rows: Some(rows),
write_hint: None,
bulk: Vec::new(),
};
let key_values = KeyValues::new(&version.metadata, mutation).unwrap();
version.memtables.mutable.write(&key_values).unwrap();

View File

@@ -288,6 +288,7 @@ mod tests {
sequence,
rows: Some(Rows { schema, rows }),
write_hint: None,
bulk: Vec::new(),
}
}

View File

@@ -176,7 +176,7 @@ pub const DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE: usize = 2048;
/// |
/// // may deadlock |
/// distributor.distribute().await; |
/// |
/// |
/// |
/// receivers[0].read().await |
/// ```
@@ -280,6 +280,7 @@ mod tests {
sequence: 1u64,
rows: None,
write_hint: None,
bulk: Vec::new(),
}],
}
.encode_to_vec(),
@@ -294,6 +295,7 @@ mod tests {
sequence: 2u64,
rows: None,
write_hint: None,
bulk: Vec::new(),
}],
}
.encode_to_vec(),
@@ -308,6 +310,7 @@ mod tests {
sequence: 3u64,
rows: None,
write_hint: None,
bulk: Vec::new(),
}],
}
.encode_to_vec(),
@@ -352,6 +355,7 @@ mod tests {
sequence: 1u64,
rows: None,
write_hint: None,
bulk: Vec::new(),
}],
}
)]
@@ -372,6 +376,7 @@ mod tests {
sequence: 2u64,
rows: None,
write_hint: None,
bulk: Vec::new(),
}],
}
)]
@@ -388,6 +393,7 @@ mod tests {
sequence: 1u64,
rows: None,
write_hint: None,
bulk: Vec::new(),
}],
};
let region2 = RegionId::new(1, 2);
@@ -397,6 +403,7 @@ mod tests {
sequence: 3u64,
rows: None,
write_hint: None,
bulk: Vec::new(),
}],
};
let region3 = RegionId::new(1, 3);
@@ -406,6 +413,7 @@ mod tests {
sequence: 3u64,
rows: None,
write_hint: None,
bulk: Vec::new(),
}],
};
let provider = Provider::kafka_provider("my_topic".to_string());
@@ -484,6 +492,7 @@ mod tests {
sequence: 1u64,
rows: None,
write_hint: None,
bulk: Vec::new(),
}],
};
let region2 = RegionId::new(1, 2);
@@ -561,6 +570,7 @@ mod tests {
sequence: 1u64,
rows: None,
write_hint: None,
bulk: Vec::new(),
}],
}
.encode_to_vec(),
@@ -575,6 +585,7 @@ mod tests {
sequence: 2u64,
rows: None,
write_hint: None,
bulk: Vec::new(),
}],
}
.encode_to_vec(),
@@ -589,6 +600,7 @@ mod tests {
sequence: 3u64,
rows: None,
write_hint: None,
bulk: Vec::new(),
}],
}
.encode_to_vec(),
@@ -603,6 +615,7 @@ mod tests {
sequence: 4u64,
rows: None,
write_hint: None,
bulk: Vec::new(),
}],
}
.encode_to_vec(),
@@ -638,6 +651,7 @@ mod tests {
sequence: 4u64,
rows: None,
write_hint: None,
bulk: Vec::new(),
}],
}
)]

View File

@@ -116,6 +116,7 @@ mod tests {
sequence: 1u64,
rows: None,
write_hint: None,
bulk: Vec::new(),
}],
};
let encoded_entry = wal_entry.encode_to_vec();

View File

@@ -246,10 +246,12 @@ impl<S> RegionWorkerLoop<S> {
}
// Collect requests by region.
// TODO(yingwen): Encode into bulk.
region_ctx.push_mutation(
sender_req.request.op_type as i32,
Some(sender_req.request.rows),
sender_req.request.hint,
Vec::new(),
sender_req.sender,
);
}