在Windows下面的虚拟机Ubuntu上面试跑了一下druid,主要包括安装,配置,写数据,查数据,
Overview
Apache Druid是一个OLAP查询引擎,能对历史
和实时
数据提供亚秒级别的查询,提供低延时的数据写入,灵活的数据探索分析,高性能的数据聚合。
架构与节点类型
Architecture
Architecture2
Architecture3
- Historical:从
Deep storage
加载并提供Segment文件供数据查询 - MiddleManager:主要用于从外部数据源读取数据,并
写入
到druid以完成segment分配 - Broker:代理节点接收来自外部client的查询请求,并转发这些请求给
historical
和middlemanager
。当代理节点接收到结果时,将来自historical和middlemanager的结果merge然后返回给调用方。为了知道整个拓扑结构,代理节点通过使用Zookeeper在确定哪些historical和middlemanager存活 - Coordinator:协调节点对
historical
的分组进行监控,以确保数据可用,和最佳的配置。协调节点通过从元数据存储中读取元数据信息来判断哪些segments是应该加载到集群的,使用Zookeeper去判断哪些历史节点是存活的,在Zookeeper中创建任务条目告诉历史节点去加载和删除segments - Overlord:监控
MiddleManagers
和负责接收任务,协调和分配任务,为任务创建锁,并返回任务状态给任务发送方 Router:当集群规模很大时,主要负责将查询请求路由到不同的Broker节点上Indexing Service:索引服务节点由多个worker组成的集群,负责为加载批量的和实时的数据创建索引,并且允许对已经存在的数据进行修改Realtime(Deprecated):实时节点负责加载实时的数据到系统中,在生产使用的几个限制成本上实时节点比索引服务节点更容易搭建
数据格式
timeseries database data format
- DataSource:Druid 的基本数据结构,在逻辑上可以理解为关系型数据库中的表。它包含
时间
、维度
和指标
三列,- Timestamp列:我们将timestamp区别开是因为我们所有的查询都以
时间
为中心 - Dimension列:Dimensions对应事件的维度,通常用于筛选
过滤
数据。 在我们例子中的数据有四个dimensions: publisher, advertiser, gender, and country。 它们每一个都可以看作是我们已选都数据的主体 - Metric列:Metrics是用于
聚合
和计算的列。在我们的例子中,click和price就是metrics。 Metrics通常是数字,并且包含支持count、sum、mean等计算操作。 在OLAP的术语中也被叫做measures。
- Timestamp列:我们将timestamp区别开是因为我们所有的查询都以
- Segment:Druid 用来存储
索引
的数据格式,不同的索引按照时间跨度来分区,分区可通过 segmentGranularity(划分索引的时间粒度)进行配置
写入过程
- Committing Data(数据提交前)
一个datasource可以由一个或者成千上万个segments组成。每个segment从被middlemanager创建开始,此时是可变
mutable
和未提交uncommitted的。为了创建一个紧凑且支持快速查询(倒排)的segment,需要以下创建过程,- 转换为列式存储(columnar format)
- 位图索引,位图压缩(bitmap)
- 压缩(RLC)
- 字符串编码(mapping dict)
- 类型感知压缩
- Committed Data(数据提交过程)
segment会被定期刷到deep storage(overtim or oversize),flush后就变得
immutable
不可变了,同时从middlemanager迁移到historical。关于这个flush segment的入口信息会被写到metadata。入口信息是segment的自描述,包括segment的schema,size,存放在deep storage的位置等,这个信息被coordinator从metadata(mysql, PostgreSQL)获取,从而定位具体数据。
查询过程
-
Query Basic Flow(基本流程) Queries请求首先进入broker,此时broker会去historical和middlemanager里面找到包含Queries的segments,然后发送一个rewritten subquery到historical和middlemanager,这两节点接收并执行subquery,接着各自返回结果到broker,之后broker将结果merge并返回
-
Optimization Method(查询优化)
- 只获取关于该Query的segment
- 对于每个segment,使用索引去判别哪些行是需要的
- 当知道了所属行之后,利用列式存储去只读相关列,而不用将整行都读取
Version
- Ubuntu 16.04.5 LTS
- druid-0.12.3-bin.tar.gz
- zookeeper-3.4.10.tar.gz
- tutorial-examples.tar.gz
Install
# install druid
tar -xzf druid-0.12.3-bin.tar.gz
tar zxvf tutorial-examples.tar.gz
# install zk
tar -xzf zookeeper-3.4.10.tar.gz
# configure
cd zookeeper-3.4.10
cp conf/zoo_sample.cfg conf/zoo.cfg
./bin/zkServer.sh start
cd druid-0.12.3
bin/init
# 这里没有采用0.12.3官方提供的java -cp命令,而采用了0.9.0提供的。
java `cat conf-quickstart/druid/coordinator/jvm.config | xargs` -cp conf-quickstart/druid/_common:conf-quickstart/druid/coordinator:lib/* io.druid.cli.Main server coordinator
java `cat conf-quickstart/druid/overlord/jvm.config | xargs` -cp conf-quickstart/druid/_common:conf-quickstart/druid/overlord:lib/* io.druid.cli.Main server overlord
java `cat conf-quickstart/druid/historical/jvm.config | xargs` -cp conf-quickstart/druid/_common:conf-quickstart/druid/historical:lib/* io.druid.cli.Main server historical
java `cat conf-quickstart/druid/middleManager/jvm.config | xargs` -cp conf-quickstart/druid/_common:conf-quickstart/druid/middleManager:lib/* io.druid.cli.Main server middleManager
java `cat conf-quickstart/druid/broker/jvm.config | xargs` -cp conf-quickstart/druid/_common:conf-quickstart/druid/broker:lib/* io.druid.cli.Main server broker
bootstrap
Index data
try to insert data into druid.
curl -X 'POST' -H 'Content-Type:application/json' -d @examples/wikipedia-index.json http://localhost:8090/druid/indexer/v1/task
examples/wikipedia-index.json
{
"type" : "index",
"spec" : {
"dataSchema" : {
"dataSource" : "wikipedia",
"parser" : {
"type" : "string",
"parseSpec" : {
"format" : "json",
"dimensionsSpec" : {
"dimensions" : [
"channel",
"cityName",
"comment",
"countryIsoCode",
"countryName",
"isAnonymous",
"isMinor",
"isNew",
"isRobot",
"isUnpatrolled",
"metroCode",
"namespace",
"page",
"regionIsoCode",
"regionName",
"user",
{ "name": "added", "type": "long" },
{ "name": "deleted", "type": "long" },
{ "name": "delta", "type": "long" }
]
},
"timestampSpec": {
"column": "time",
"format": "iso"
}
}
},
"metricsSpec" : [],
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "day",
"queryGranularity" : "none",
"intervals" : ["2015-09-12/2015-09-13"],
"rollup" : false
}
},
"ioConfig" : {
"type" : "index",
"firehose" : {
"type" : "local",
"baseDir" : "quickstart/",
"filter" : "wikiticker-2015-09-12-sampled.json.gz"
},
"appendToExisting" : false
},
"tuningConfig" : {
"type" : "index",
"targetPartitionSize" : 5000000,
"maxRowsInMemory" : 25000,
"forceExtendableShardSpecs" : true
}
}
}
Index data
Query data
Build-in Queries
try to query data from druid.
curl -X 'POST' -H 'Content-Type:application/json' -d @examples/wikipedia-top-pages.json http://localhost:8082/druid/v2?pretty
wikipedia-top-pages.json
{
"queryType" : "topN",
"dataSource" : "wikipedia",
"intervals" : ["2015-09-12/2015-09-13"],
"granularity" : "all",
"dimension" : "page",
"metric" : "count",
"threshold" : 10,
"aggregations" : [
{
"type" : "count",
"name" : "count"
}
]
}
build-in TopN
SQL Queries
上面topN是druid内置的TopN queries。而通过上述配置,sql是默认没有开启的。如果需要开启sql,需要在broker/common下面添加druid.sql.enable=true
。官网是在examples/conf/druid/_common/common.runtime.properties
下面默认开启的。而我们这边是通过conf-quickstart/druid/broker/runtime.properties
启动的,两种方式皆可,取决于druid.sql.enable=true
是否被识别而已。
Sql TopN
curl -X 'POST' -H 'Content-Type:application/json' -d @examples/wikipedia-top-pages-sql.json http://localhost:8082/druid/v2/sql
wikipedia-top-pages-sql.json
{
"query":"SELECT page, COUNT(*) AS Edits FROM wikipedia WHERE \"__time\" BETWEEN TIMESTAMP '2015-09-12 00:00:00' AND TIMESTAMP '2015-09-13 00:00:00' GROUP BY page ORDER BY Edits DESC LIMIT 10"
}
sql TopN
Timeseries
curl -X 'POST' -H 'Content-Type:application/json' -d @examples/wikipedia-timeseries-sql.json http://localhost:8082/druid/v2/sql
wikipedia-timeseries-sql.json
{
"query":"SELECT FLOOR(__time to HOUR) AS HourTime, SUM(deleted) AS LinesDeleted FROM wikipedia WHERE \"__time\" BETWEEN TIMESTAMP '2015-09-12 00:00:00' AND TIMESTAMP '2015-09-13 00:00:00' GROUP BY FLOOR(__time to HOUR)"
}
timeseries
GroupBy
curl -X 'POST' -H 'Content-Type:application/json' -d @examples/wikipedia-groupby-sql.json http://localhost:8082/druid/v2/sql
wikipedia-groupby-sql.json
{
"query":"SELECT channel, SUM(added) FROM wikipedia WHERE \"__time\" BETWEEN TIMESTAMP '2015-09-12 00:00:00' AND TIMESTAMP '2015-09-13 00:00:00' GROUP BY channel ORDER BY SUM(added) DESC LIMIT 5"
}
groupBy
Scan
curl -X 'POST' -H 'Content-Type:application/json' -d @examples/wikipedia-scan-sql.json http://localhost:8082/druid/v2/sql
wikipedia-scan-sql.json
{
"query":"SELECT user, page FROM wikipedia WHERE \"__time\" BETWEEN TIMESTAMP '2015-09-12 02:00:00' AND TIMESTAMP '2015-09-12 03:00:00' LIMIT 5"
}
scan
Explain
curl -X 'POST' -H 'Content-Type:application/json' -d @examples/wikipedia-explain-top-pages-sql.json http://localhost:8082/druid/v2/sql
wikipedia-explain-top-pages-sql.json
{
"query":"EXPLAIN PLAN FOR SELECT page, COUNT(*) AS Edits FROM wikipedia WHERE \"__time\" BETWEEN TIMESTAMP '2015-09-12 00:00:00' AND TIMESTAMP '2015-09-13 00:00:00' GROUP BY page ORDER BY Edits DESC LIMIT 10"
}
explain plan
data migration
有时候当旧集群不足以再存放新数据的时候,一方面可以对旧集群扩容,另一方面可以新开一个集群。而新开一个集群就需要将历史数据导过来,不然search就缺了旧数据,一般步骤如下,
- 切流。切换新数据到新集群
- 搬移。将旧数据搬移到新集群
旧数据搬移到新集群
在druid的具体操作是,
- 将具体的segments从srcPath直接cp到destPath
hadoop distcp hdfs://nn1:8020/src/path/to/segment/file hdfs://nn1:8020/dest/path/to/segment/file
- 在新的destPath下面为segments建立新的metadata
java \ -cp "/home/chenfh5/project/druid/druid-0.12.3/lib/*" \ -Ddruid.metadata.storage.type=mysql \ -Ddruid.metadata.storage.connector.connectURI=jdbc:mysql://localhost:3306/druid2 \ -Ddruid.metadata.storage.connector.user=yourname\ -Ddruid.metadata.storage.connector.password=yourpwd\ -Ddruid.extensions.loadList=[\"mysql-metadata-storage\",\"druid-hdfs-storage\"] \ -Ddruid.storage.type=hdfs \ io.druid.cli.Main tools insert-segment-to-db --workingDir hdfs://nn1:8020/dest/path/to/segment/ --updateDescriptor true
如果
mysql-metadata-storage
不在extensions文件夹下,就到官网下载一个,然后解压过去。
双集群迁移测试
为了在本地虚拟上部署双集群,因为用到insert-segment-to-db
这个tool,所以,
- 修改-Ddruid.metadata.storage为默认db(Derby )
- 修改默认metadata.storage为mysql,使其适配-Ddruid.metadata.storage
在
install
章节里面,看到java
cat conf-quickstart/druid/coordinator/jvm.config | xargs-cp conf-quickstart/druid/_common:conf-quickstart/druid/coordinator:lib/* io.druid.cli.Main server coordinator
是通过conf-quickstart/druid/_common
来启动的,其下的properties是,
# For Derby server on your Druid Coordinator (only viable in a cluster with a single Coordinator, no fail-over):
druid.metadata.storage.type=derby
druid.metadata.storage.connector.connectURI=jdbc:derby://localhost:1527/var/druid/metadata.db;create=true
druid.metadata.storage.connector.host=localhost
druid.metadata.storage.connector.port=1527
改为,For MySQL:
druid.extensions.loadList=["mysql-metadata-storage"]
druid.metadata.storage.type=mysql
druid.metadata.storage.connector.connectURI=jdbc:mysql://db.example.com:3306/druid
druid.metadata.storage.connector.user=...
druid.metadata.storage.connector.password=...
另外,修改为mysql
之后,可能会出现Table doesn't exist
的情况,网上有解决方式。
然后就可以启动了。
cluster1
启动cluster;insert wikipedia数据集
cluster2
cat conf-quickstart/druid/*/runtime.properties | grep -C2 port
看到五大角色已占用端口,修改这里,使第二集群不会产生端口冲突。
默认集群1端口
集群2端口
cp -r /home/chenfh5/project/druid/druid-0.12.3/var/druid/segments/wikipedia/ /home/chenfh5/project/druid/druid-0.12.3.bak/var/druid/segments/
insert-segment-to-db
check
curl -XGET http://localhost:9081/druid/coordinator/v1/metadata/datasources?full
delete data
# marked as "unused"
curl -XDELETE http://localhost:8081/druid/coordinator/v1/datasources/{dataSourceName}
curl -XDELETE http://localhost:8081/druid/coordinator/v1/datasources/{dataSourceName}/intervals/{interval}
`interval = 2016-06-27_2016-06-28` (_)
# run Kill Task will delete any "unused" segments
curl -X 'POST' -H 'Content-Type:application/json' http://localhost:8090/druid/indexer/v1/task -d'{
"type": "kill",
"dataSource": "deletion-tutorial",
"interval" : "2015-09-12/2015-09-13"
}'
`interval = 2015-09-12/2015-09-13` (/)