sink&source

This commit is contained in:
Discord9
2023-08-10 18:11:22 +08:00
parent 2cf7d6d569
commit 76d8709774
12 changed files with 523 additions and 8 deletions

View File

@@ -11,6 +11,7 @@ use timely::progress::Timestamp;
use crate::compute::context::CollectionBundle;
use crate::compute::plan::Plan;
use crate::compute::types::BuildDesc;
use crate::compute::Context;
use crate::expr::Id;
use crate::repr::{self, Row};
@@ -53,6 +54,19 @@ impl RenderTimestamp for repr::Timestamp {
}
}
// This implementation block allows child timestamps to vary from parent timestamps.
impl<G> Context<G, Row>
where
G: Scope,
G::Timestamp: RenderTimestamp,
{
pub(crate) fn build_object(&mut self, object: BuildDesc<Plan>) {
// First, transform the relation expression into a render plan.
let bundle = self.render_plan(object.plan);
self.insert_id(Id::Global(object.id), bundle);
}
}
impl<S> Context<S, Row>
where
S: Scope,
@@ -188,3 +202,32 @@ where
}
}
}
#[cfg(test)]
mod test {
use differential_dataflow::input::InputSession;
use timely::dataflow::scopes::Child;
use super::*;
use crate::expr::LocalId;
use crate::repr::Diff;
#[test]
fn test_simple_plan_render() {
timely::execute_from_args(std::env::args(), move |worker| {
let mut input = InputSession::<repr::Timestamp, Row, Diff>::new();
worker.dataflow(|scope: &mut Child<'_, _, repr::Timestamp>| {
let mut test_ctx = Context::<_, Row, _>::for_dataflow_in(scope.clone());
let plan = Plan::Constant {
rows: Ok(vec![(Row::default(), 0, 1)]),
};
let input_collection = input.to_collection(scope);
let err_collection = InputSession::new().to_collection(scope);
let input_collection =
CollectionBundle::from_collections(input_collection, err_collection);
// insert collection
test_ctx.insert_id(Id::Local(LocalId(0)), input_collection);
let bundle = test_ctx.render_plan(plan);
});
});
}
}

View File

@@ -100,11 +100,10 @@ where
}
});
// TODO(discord9): find out how to do `consolidate_stream` without Abonomation
// Demux out the potential errors from key and value selector evaluation.
let (ok, mut err) = key_val_input
.as_collection()
// .consolidate_stream()
.consolidate_stream()
.flat_map_fallible("OkErrDemux", Some);
err = err.concat(&err_input);

View File

@@ -0,0 +1,59 @@
use std::collections::BTreeMap;
use serde::{Deserialize, Serialize};
use timely::progress::Antichain;
use crate::compute::plan::Plan;
use crate::compute::types::sinks::ComputeSinkDesc;
use crate::compute::types::sources::SourceInstanceDesc;
use crate::expr::{GlobalId, ScalarExpr};
use crate::repr::{self, RelationType};
/// A description of a dataflow to construct and results to surface.
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct DataflowDescription<P, S: 'static = (), T = repr::Timestamp> {
/// Sources instantiations made available to the dataflow pair with monotonicity information.
pub source_imports: BTreeMap<GlobalId, (SourceInstanceDesc<S>, bool)>,
/// Indexes made available to the dataflow.
/// (id of new index, description of index, relationtype of base source/view, monotonic)
pub index_imports: BTreeMap<GlobalId, (IndexDesc, RelationType, bool)>,
/// Views and indexes to be built and stored in the local context.
/// Objects must be built in the specific order, as there may be
/// dependencies of later objects on prior identifiers.
pub objects_to_build: Vec<BuildDesc<P>>,
/// Indexes to be made available to be shared with other dataflows
/// (id of new index, description of index, relationtype of base source/view)
pub index_exports: BTreeMap<GlobalId, (IndexDesc, RelationType)>,
/// sinks to be created
/// (id of new sink, description of sink)
pub sink_exports: BTreeMap<GlobalId, ComputeSinkDesc<S, T>>,
/// An optional frontier to which inputs should be advanced.
///
/// If this is set, it should override the default setting determined by
/// the upper bound of `since` frontiers contributing to the dataflow.
/// It is an error for this to be set to a frontier not beyond that default.
pub as_of: Option<Antichain<T>>,
/// Frontier beyond which the dataflow should not execute.
/// Specifically, updates at times greater or equal to this frontier are suppressed.
/// This is often set to `as_of + 1` to enable "batch" computations.
pub until: Antichain<T>,
/// Human readable name
pub debug_name: String,
}
/// An association of a global identifier to an expression.
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct BuildDesc<P = Plan> {
pub id: GlobalId,
pub plan: P,
}
/// An index storing processed updates so they can be queried
/// or reused in other computations
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
pub struct IndexDesc {
/// Identity of the collection the index is on.
pub on_id: GlobalId,
/// Expressions to be arranged, in order of decreasing primacy.
pub key: Vec<ScalarExpr>,
}

View File

@@ -1 +1,13 @@
use serde::{Deserialize, Serialize};
use crate::expr::GlobalId;
mod dataflow;
mod sinks;
mod sources;
/// An association of a global identifier to an expression.
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct BuildDesc<P> {
pub id: GlobalId,
pub plan: P,
}

View File

@@ -0,0 +1,28 @@
use serde::{Deserialize, Serialize};
use timely::progress::Antichain;
use crate::expr::GlobalId;
use crate::repr::{self, RelationDesc};
/// A sink for updates to a relational collection.
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct ComputeSinkDesc<S: 'static = (), T = repr::Timestamp> {
pub from: GlobalId,
pub from_desc: RelationDesc,
pub connection: ComputeSinkConnection<S>,
pub with_snapshot: bool,
pub up_to: Antichain<T>,
}
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub enum ComputeSinkConnection<S: 'static = ()> {
// TODO(discord9): consider if ever needed
Subscribe,
Persist(PersistSinkConnection<S>),
}
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct PersistSinkConnection<S> {
pub value_desc: RelationDesc,
pub storage_metadata: S,
}

View File

@@ -0,0 +1,26 @@
use serde::{Deserialize, Serialize};
use crate::expr::MapFilterProject;
use crate::repr::RelationType;
/// A description of an instantiation of a source.
///
/// This includes a description of the source, but additionally any
/// context-dependent options like the ability to apply filtering and
/// projection to the records as they emerge.
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct SourceInstanceDesc<M> {
/// Arguments for this instantiation of the source.
pub arguments: SourceInstanceArguments,
/// Additional metadata used by the storage client of a compute instance to read it.
pub storage_metadata: M,
/// The relation type of this source
pub typ: RelationType,
}
/// Per-source construction arguments.
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct SourceInstanceArguments {
/// Linear operators to be applied record-by-record.
pub operators: Option<MapFilterProject>,
}

View File

@@ -5,7 +5,7 @@ use super::ScalarExpr;
// TODO(discord9): more function & eval
use crate::{repr::Row, storage::errors::EvalError};
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize)]
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize, Hash)]
pub enum UnaryFunc {
Not,
IsNull,
@@ -19,7 +19,7 @@ impl UnaryFunc {
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize)]
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize, Hash)]
pub enum BinaryFunc {}
impl BinaryFunc {
@@ -33,7 +33,7 @@ impl BinaryFunc {
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize)]
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize, Hash)]
pub enum VariadicFunc {}
impl VariadicFunc {

View File

@@ -1,4 +1,6 @@
#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
use serde::{Deserialize, Serialize};
#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)]
pub enum GlobalId {
/// System namespace.
System(u64),

View File

@@ -17,7 +17,7 @@ use serde::{Deserialize, Serialize};
use crate::expr::func::{BinaryFunc, UnaryFunc, VariadicFunc};
use crate::storage::errors::EvalError;
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum ScalarExpr {
/// A column of the input row
Column(usize),

View File

@@ -4,11 +4,14 @@ use std::borrow::Borrow;
use std::slice::SliceIndex;
use datatypes::value::Value;
pub(crate) use relation::{RelationDesc, RelationType};
use serde::{Deserialize, Serialize};
/// System-wide Record count difference type.
pub type Diff = i64;
mod relation;
mod timestamp;
/// A row is a vector of values.
///
/// TODO(discord9): use a more efficient representation

View File

@@ -0,0 +1,342 @@
use datatypes::prelude::ConcreteDataType;
use serde::{Deserialize, Serialize};
/// The type of a relation.
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)]
pub struct RelationType {
/// The type for each column, in order.
pub column_types: Vec<ColumnType>,
/// Sets of indices that are "keys" for the collection.
///
/// Each element in this list is a set of column indices, each with the
/// property that the collection contains at most one record with each
/// distinct set of values for each column. Alternately, for a specific set
/// of values assigned to the these columns there is at most one record.
///
/// A collection can contain multiple sets of keys, although it is common to
/// have either zero or one sets of key indices.
#[serde(default)]
pub keys: Vec<Vec<usize>>,
}
impl RelationType {
/// Constructs a `RelationType` representing the relation with no columns and
/// no keys.
pub fn empty() -> Self {
RelationType::new(vec![])
}
/// Constructs a new `RelationType` from specified column types.
///
/// The `RelationType` will have no keys.
pub fn new(column_types: Vec<ColumnType>) -> Self {
RelationType {
column_types,
keys: Vec::new(),
}
}
/// Adds a new key for the relation.
pub fn with_key(mut self, mut indices: Vec<usize>) -> Self {
indices.sort_unstable();
if !self.keys.contains(&indices) {
self.keys.push(indices);
}
self
}
pub fn with_keys(mut self, keys: Vec<Vec<usize>>) -> Self {
for key in keys {
self = self.with_key(key)
}
self
}
/// Computes the number of columns in the relation.
pub fn arity(&self) -> usize {
self.column_types.len()
}
/// Gets the index of the columns used when creating a default index.
pub fn default_key(&self) -> Vec<usize> {
if let Some(key) = self.keys.first() {
if key.is_empty() {
(0..self.column_types.len()).collect()
} else {
key.clone()
}
} else {
(0..self.column_types.len()).collect()
}
}
/// True if any collection described by `self` could safely be described by `other`.
///
/// In practice this means checking that the scalar types match exactly, and that the
/// nullability of `self` is at least as strict as `other`, and that all keys of `other`
/// contain some key of `self` (as a set of key columns is less strict than any subset).
pub fn subtypes(&self, other: &RelationType) -> bool {
let all_keys = other.keys.iter().all(|key1| {
self.keys
.iter()
.any(|key2| key1.iter().all(|k| key2.contains(k)))
});
if !all_keys {
return false;
}
if self.column_types.len() != other.column_types.len() {
return false;
}
for (col1, col2) in self.column_types.iter().zip(other.column_types.iter()) {
if col1.nullable && !col2.nullable {
return false;
}
if col1.scalar_type != col2.scalar_type {
return false;
}
}
true
}
}
/// The type of a `Value`
///
/// [`ColumnType`] bundles information about the scalar type of a datum (e.g.,
/// Int32 or String) with its nullability.
///
/// To construct a column type, either initialize the struct directly, or
/// use the [`ScalarType::nullable`] method.
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)]
pub struct ColumnType {
/// The underlying scalar type (e.g., Int32 or String) of this column.
pub scalar_type: ConcreteDataType,
/// Whether this datum can be null.`
#[serde(default = "return_true")]
pub nullable: bool,
}
/// This method exists solely for the purpose of making ColumnType nullable by
/// default in unit tests. The default value of a bool is false, and the only
/// way to make an object take on any other value by default is to pass it a
/// function that returns the desired default value. See
/// <https://github.com/serde-rs/serde/issues/1030>
#[inline(always)]
fn return_true() -> bool {
true
}
/// A description of the shape of a relation.
///
/// It bundles a [`RelationType`] with the name of each column in the relation.
/// Individual column names are optional.
///
/// # Examples
///
/// A `RelationDesc`s is typically constructed via its builder API:
///
/// ```
/// use mz_repr::{ColumnType, RelationDesc, ScalarType};
///
/// let desc = RelationDesc::empty()
/// .with_column("id", ScalarType::Int64.nullable(false))
/// .with_column("price", ScalarType::Float64.nullable(true));
/// ```
///
/// In more complicated cases, like when constructing a `RelationDesc` in
/// response to user input, it may be more convenient to construct a relation
/// type first, and imbue it with column names to form a `RelationDesc` later:
///
/// ```
/// use mz_repr::RelationDesc;
///
/// # fn plan_query(_: &str) -> mz_repr::RelationType { mz_repr::RelationType::new(vec![]) }
/// let relation_type = plan_query("SELECT * FROM table");
/// let names = (0..relation_type.arity()).map(|i| match i {
/// 0 => "first",
/// 1 => "second",
/// _ => "unknown",
/// });
/// let desc = RelationDesc::new(relation_type, names);
/// ```
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
pub struct RelationDesc {
typ: RelationType,
names: Vec<ColumnName>,
}
impl RelationDesc {
/// Constructs a new `RelationDesc` that represents the empty relation
/// with no columns and no keys.
pub fn empty() -> Self {
RelationDesc {
typ: RelationType::empty(),
names: vec![],
}
}
/// Constructs a new `RelationDesc` from a `RelationType` and an iterator
/// over column names.
///
/// # Panics
///
/// Panics if the arity of the `RelationType` is not equal to the number of
/// items in `names`.
pub fn new<I, N>(typ: RelationType, names: I) -> Self
where
I: IntoIterator<Item = N>,
N: Into<ColumnName>,
{
let names: Vec<_> = names.into_iter().map(|name| name.into()).collect();
assert_eq!(typ.column_types.len(), names.len());
RelationDesc { typ, names }
}
pub fn from_names_and_types<I, T, N>(iter: I) -> Self
where
I: IntoIterator<Item = (N, T)>,
T: Into<ColumnType>,
N: Into<ColumnName>,
{
let (names, types): (Vec<_>, Vec<_>) = iter.into_iter().unzip();
let types = types.into_iter().map(Into::into).collect();
let typ = RelationType::new(types);
Self::new(typ, names)
}
/// Concatenates a `RelationDesc` onto the end of this `RelationDesc`.
pub fn concat(mut self, other: Self) -> Self {
let self_len = self.typ.column_types.len();
self.names.extend(other.names);
self.typ.column_types.extend(other.typ.column_types);
for k in other.typ.keys {
let k = k.into_iter().map(|idx| idx + self_len).collect();
self = self.with_key(k);
}
self
}
/// Appends a column with the specified name and type.
pub fn with_column<N>(mut self, name: N, column_type: ColumnType) -> Self
where
N: Into<ColumnName>,
{
self.typ.column_types.push(column_type);
self.names.push(name.into());
self
}
/// Adds a new key for the relation.
pub fn with_key(mut self, indices: Vec<usize>) -> Self {
self.typ = self.typ.with_key(indices);
self
}
/// Drops all existing keys.
pub fn without_keys(mut self) -> Self {
self.typ.keys.clear();
self
}
/// Builds a new relation description with the column names replaced with
/// new names.
///
/// # Panics
///
/// Panics if the arity of the relation type does not match the number of
/// items in `names`.
pub fn with_names<I, N>(self, names: I) -> Self
where
I: IntoIterator<Item = N>,
N: Into<ColumnName>,
{
Self::new(self.typ, names)
}
/// Computes the number of columns in the relation.
pub fn arity(&self) -> usize {
self.typ.arity()
}
/// Returns the relation type underlying this relation description.
pub fn typ(&self) -> &RelationType {
&self.typ
}
/// Returns an iterator over the columns in this relation.
pub fn iter(&self) -> impl Iterator<Item = (&ColumnName, &ColumnType)> {
self.iter_names().zip(self.iter_types())
}
/// Returns an iterator over the types of the columns in this relation.
pub fn iter_types(&self) -> impl Iterator<Item = &ColumnType> {
self.typ.column_types.iter()
}
/// Returns an iterator over the names of the columns in this relation.
pub fn iter_names(&self) -> impl Iterator<Item = &ColumnName> {
self.names.iter()
}
/// Finds a column by name.
///
/// Returns the index and type of the column named `name`. If no column with
/// the specified name exists, returns `None`. If multiple columns have the
/// specified name, the leftmost column is returned.
pub fn get_by_name(&self, name: &ColumnName) -> Option<(usize, &ColumnType)> {
self.iter_names()
.position(|n| n == name)
.map(|i| (i, &self.typ.column_types[i]))
}
/// Gets the name of the `i`th column.
///
/// # Panics
///
/// Panics if `i` is not a valid column index.
pub fn get_name(&self, i: usize) -> &ColumnName {
&self.names[i]
}
/// Mutably gets the name of the `i`th column.
///
/// # Panics
///
/// Panics if `i` is not a valid column index.
pub fn get_name_mut(&mut self, i: usize) -> &mut ColumnName {
&mut self.names[i]
}
/// Gets the name of the `i`th column if that column name is unambiguous.
///
/// If at least one other column has the same name as the `i`th column,
/// returns `None`. If the `i`th column has no name, returns `None`.
///
/// # Panics
///
/// Panics if `i` is not a valid column index.
pub fn get_unambiguous_name(&self, i: usize) -> Option<&ColumnName> {
let name = &self.names[i];
if self.iter_names().filter(|n| *n == name).count() == 1 {
Some(name)
} else {
None
}
}
}
/// The name of a column in a [`RelationDesc`].
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)]
pub struct ColumnName(pub(crate) String);
impl ColumnName {
/// Returns this column name as a `str`.
pub fn as_str(&self) -> &str {
&self.0
}
/// Returns a mutable reference to the string underlying this column name.
pub fn as_mut_str(&mut self) -> &mut String {
&mut self.0
}
}

View File

@@ -0,0 +1 @@