Overview
整理一下自己关于flink/spark join的理解
Join algorithms
nested loop join, O(n^2)
for each tuple r in R do
for each tuple s in S do
if r and s satisfy the join condition then
yield tuple <r,s>
credit: csdn
index nested loop join, O(nlogn)
for each tuple r in R do
for each tuple s in S in the index lookup do
yield tuple <r,s>
reduce the lookup time in S, since index, credit: csdn
block nested loop, O(nlogn)
for each tuple r in R do
add r to in_memory_hashtable
if size(hashtable) > max_memory_size
scan_S_to_join
in_memory_hash_table.clear()
final scan of S with rest hashtable
// assume size R < S
// not every record of R scan S, but load R as a batch(block/map/page),
// then iterator S to lookup R.
// A special-case of the classic hash join.
reduce the loop time in S, since buffer, credit: csdn
hash join
add r to in_memory_hashtable // if memory not sufficiency then degrade to block nested loop
for each tuple s in S do
if r and s satisfy the join condition then
yield tuple <r,s>
credit: hive
grace hash join
// raw hash join + partition join key(disk),
// let partition data can fit memory,
// and since partiton, no shuffle required
for row in t1:
hashValue = hash_func(row)
N = hashValue % PartCount;
write row to file t1_N;
for row in t2:
hashValue = hash_func(row)
N = hashValue % PartCount;
write row to file t2_N;
for i in range(0, PartCount):
hash_join(t1_i, t2_i)
add partition, credit: hive
hybird hash join
partition + try keeping first partition in memory to avoid one more disk load67
partition0 hot start, credit: hive
symmetric hash join
design for streaming
credit: jdon
sort merge join
// S is bigger
// 1. sort R, S on join keys
// 2. merge
r <- sortedR, s <- sortedS
while r and s:
if r > s: increment s
if r < s: increment r
elif r match s on join key:
yield tuple <r,s>
increment s
else None
sort, iterator not from very beginning merge, credit: cnblogs
ps. 大数据处理framework有一个好处就是shuffle后天然是sort的, 有利于merge-sort join, 这也是为什么它们在大数据方面很优异. 当然核心思想还是分治, 分治后一个partition的join就相当于跑在普通机器的SQLite
Classification
flink
table join
- base on calcite
- generator
- TABLE_EXEC_DISABLED_OPERATORS
- planner
- OperatorType.SortMergeJoin, JoinType.INNER
- PlannerBase -> ExecNodeGraphGenerator -> translateToPlan
- translateToPlanInternal -> HashJoinOperator/SortMergeJoinOperator
- runner
- executor JoinOperator
dataset join
- set joinHint(BROADCAST_HASH, REPARTITION_SORT_MERGE)
- sdet joinType(INNER, OUTER)
- compileJobGraph().preVisit() -> JoinNode()
- switch (joinHint) -> choose broadcast-hash or merge-sort
- richFlatJoinFunc()
stream join
- unbounded
symmetric hash join
- bounded/window/interval
- union two stream, making it like one stream
unionStream = taggedInput1.union(taggedInput2)
- apply this unioned stream with richFlatJoinFunc()
- union two stream, making it like one stream
spark
table join
dataset join
stream join
- unbounded
- buffer past input as streaming state
- limit the state using watermarks
- bounded/window/interval
Realtime Query on Stream
如何在一个stream中实时作query,
- flink是dynamic table
stream -> dynamic table(DT) -> query on DT -> new DT -> stream, credit: flink
Liz is the latest record, i.e, (Mary, /home) comes first, credit: flink
- spark是unbound table
new arrival stream append to a table, each query trigger equal to execute SQL on this table, credit: spark
credit: spark
Reference
- Peeking into Apache Flink’s Engine Room, dataset only
- Flink使用Broadcast State实现流处理配置实时更新
- The Broadcast State Pattern
- Joining
- FLINK入门——JOIN整理
- Broadcast “JOIN” in Flink
- 学习Mysql的join算法:Index Nested-Loop Join和Block Nested-Loop Join
- cmu join algorithms
- Spark Streaming vs. Structured Streaming
- Structured Streaming VS Flink in zhihu
PREVIOUS自实现openlr的调式过程
NEXTGME gaming