join implement and classification in big data processing framework


整理一下自己关于flink/spark join的理解

Join algorithms

nested loop join, O(n^2)

for each tuple r in R do
    for each tuple s in S do
        if r and s satisfy the join condition then
            yield tuple <r,s>


credit: csdn

index nested loop join, O(nlogn)

for each tuple r in R do
    for each tuple s in S in the index lookup do
        yield tuple <r,s>


reduce the lookup time in S, since index, credit: csdn

block nested loop, O(nlogn)

for each tuple r in R do
    add r to in_memory_hashtable
    if size(hashtable) > max_memory_size
final scan of S with rest hashtable

// assume size R < S
// not every record of R scan S, but load R as a batch(block/map/page),
// then iterator S to lookup R.

// A special-case of the classic hash join.


reduce the loop time in S, since buffer, credit: csdn

hash join

add r to in_memory_hashtable // if memory not sufficiency then degrade to block nested loop
for each tuple s in S do
    if r and s satisfy the join condition then
        yield tuple <r,s>


credit: hive

grace hash join

// raw hash join + partition join key(disk),
// let partition data can fit memory, 
// and since partiton, no shuffle required

for row in t1:
    hashValue = hash_func(row)
    N = hashValue % PartCount;
    write row to file t1_N;

for row in t2:
    hashValue = hash_func(row)
    N = hashValue % PartCount;
    write row to file t2_N;

for i in range(0, PartCount):
    hash_join(t1_i, t2_i)


add partition, credit: hive

hybird hash join

partition + try keeping first partition in memory to avoid one more disk load67


partition0 hot start, credit: hive

symmetric hash join

design for streaming


credit: jdon

sort merge join

// S is bigger
// 1. sort R, S on join keys
// 2. merge

r <- sortedR, s <- sortedS
while r and s:
  if r > s: increment s
  if r < s: increment r
  elif r match s on join key: 
      yield tuple <r,s>
      increment s
  else None


sort, iterator not from very beginning merge, credit: cnblogs

ps. 大数据处理framework有一个好处就是shuffle后天然是sort的, 有利于merge-sort join, 这也是为什么它们在大数据方面很优异. 当然核心思想还是分治, 分治后一个partition的join就相当于跑在普通机器的SQLite


table join

  1. base on calcite
  2. generator
  3. planner
    • OperatorType.SortMergeJoin, JoinType.INNER
    • PlannerBase -> ExecNodeGraphGenerator -> translateToPlan
    • translateToPlanInternal -> HashJoinOperator/SortMergeJoinOperator
  4. runner
    • executor JoinOperator

dataset join

  2. sdet joinType(INNER, OUTER)
  3. compileJobGraph().preVisit() -> JoinNode()
  4. switch (joinHint) -> choose broadcast-hash or merge-sort
  5. richFlatJoinFunc()

stream join

  • unbounded
    • symmetric hash join
  • bounded/window/interval
    1. union two stream, making it like one stream
      • unionStream = taggedInput1.union(taggedInput2)
    2. apply this unioned stream with richFlatJoinFunc()


table join

dataset join

stream join

  • unbounded
    1. buffer past input as streaming state
    2. limit the state using watermarks
  • bounded/window/interval

Realtime Query on Stream


  • flink是dynamic table image

    stream -> dynamic table(DT) -> query on DT -> new DT -> stream, credit: flink


    Liz is the latest record, i.e, (Mary, /home) comes first, credit: flink

  • spark是unbound table image

    new arrival stream append to a table, each query trigger equal to execute SQL on this table, credit: spark


    credit: spark


  1. Peeking into Apache Flink’s Engine Room, dataset only
  2. Flink使用Broadcast State实现流处理配置实时更新
  3. The Broadcast State Pattern
  4. Joining
  5. FLINK入门——JOIN整理
  6. Broadcast “JOIN” in Flink
  7. 学习Mysql的join算法:Index Nested-Loop Join和Block Nested-Loop Join
  8. cmu join algorithms
  9. Spark Streaming vs. Structured Streaming
  10. Structured Streaming VS Flink in zhihu