Druid-0.12.3试用

 

在Windows下面的虚拟机Ubuntu上面试跑了一下druid,主要包括安装,配置,写数据,查数据,

Overview

Apache Druid是一个OLAP查询引擎,能对历史实时数据提供亚秒级别的查询,提供低延时的数据写入,灵活的数据探索分析,高性能的数据聚合。

架构与节点类型

Architecture

Architecture2

Architecture3

  • Historical:从Deep storage加载并提供Segment文件供数据查询
  • MiddleManager:主要用于从外部数据源读取数据,并写入到druid以完成segment分配
  • Broker:代理节点接收来自外部client的查询请求,并转发这些请求给historicalmiddlemanager。当代理节点接收到结果时,将来自historical和middlemanager的结果merge然后返回给调用方。为了知道整个拓扑结构,代理节点通过使用Zookeeper在确定哪些historical和middlemanager存活
  • Coordinator:协调节点对historical的分组进行监控,以确保数据可用,和最佳的配置。协调节点通过从元数据存储中读取元数据信息来判断哪些segments是应该加载到集群的,使用Zookeeper去判断哪些历史节点是存活的,在Zookeeper中创建任务条目告诉历史节点去加载和删除segments
  • Overlord:监控MiddleManagers和负责接收任务,协调和分配任务,为任务创建锁,并返回任务状态给任务发送方
  • Router:当集群规模很大时,主要负责将查询请求路由到不同的Broker节点上
  • Indexing Service:索引服务节点由多个worker组成的集群,负责为加载批量的和实时的数据创建索引,并且允许对已经存在的数据进行修改
  • Realtime(Deprecated):实时节点负责加载实时的数据到系统中,在生产使用的几个限制成本上实时节点比索引服务节点更容易搭建

数据格式

timeseries database data format

  1. DataSource:Druid 的基本数据结构,在逻辑上可以理解为关系型数据库中的表。它包含时间维度指标三列,
    • Timestamp列:我们将timestamp区别开是因为我们所有的查询都以时间为中心
    • Dimension列:Dimensions对应事件的维度,通常用于筛选过滤数据。 在我们例子中的数据有四个dimensions: publisher, advertiser, gender, and country。 它们每一个都可以看作是我们已选都数据的主体
    • Metric列:Metrics是用于聚合和计算的列。在我们的例子中,click和price就是metrics。 Metrics通常是数字,并且包含支持count、sum、mean等计算操作。 在OLAP的术语中也被叫做measures。
  2. Segment:Druid 用来存储索引的数据格式,不同的索引按照时间跨度来分区,分区可通过 segmentGranularity(划分索引的时间粒度)进行配置

写入过程

  • Committing Data(数据提交前) 一个datasource可以由一个或者成千上万个segments组成。每个segment从被middlemanager创建开始,此时是可变mutable和未提交uncommitted的。为了创建一个紧凑且支持快速查询(倒排)的segment,需要以下创建过程,
    • 转换为列式存储(columnar format)
    • 位图索引,位图压缩(bitmap)
    • 压缩(RLC)
    • 字符串编码(mapping dict)
    • 类型感知压缩
  • Committed Data(数据提交过程) segment会被定期刷到deep storage(overtim or oversize),flush后就变得immutable不可变了,同时从middlemanager迁移到historical。关于这个flush segment的入口信息会被写到metadata。入口信息是segment的自描述,包括segment的schema,size,存放在deep storage的位置等,这个信息被coordinator从metadata(mysql, PostgreSQL)获取,从而定位具体数据。

查询过程

  • Query Basic Flow(基本流程) Queries请求首先进入broker,此时broker会去historical和middlemanager里面找到包含Queries的segments,然后发送一个rewritten subquery到historical和middlemanager,这两节点接收并执行subquery,接着各自返回结果到broker,之后broker将结果merge并返回

  • Optimization Method(查询优化)

    1. 只获取关于该Query的segment
    2. 对于每个segment,使用索引去判别哪些行是需要的
    3. 当知道了所属行之后,利用列式存储去只读相关列,而不用将整行都读取

Version

  1. Ubuntu 16.04.5 LTS
  2. druid-0.12.3-bin.tar.gz
  3. zookeeper-3.4.10.tar.gz
  4. tutorial-examples.tar.gz

Install

# install druid
tar -xzf druid-0.12.3-bin.tar.gz
tar zxvf tutorial-examples.tar.gz

# install zk
tar -xzf zookeeper-3.4.10.tar.gz

# configure
cd zookeeper-3.4.10
cp conf/zoo_sample.cfg conf/zoo.cfg
./bin/zkServer.sh start

cd druid-0.12.3
bin/init

# 这里没有采用0.12.3官方提供的java -cp命令,而采用了0.9.0提供的。
java `cat conf-quickstart/druid/coordinator/jvm.config | xargs` -cp conf-quickstart/druid/_common:conf-quickstart/druid/coordinator:lib/* io.druid.cli.Main server coordinator

java `cat conf-quickstart/druid/overlord/jvm.config | xargs` -cp conf-quickstart/druid/_common:conf-quickstart/druid/overlord:lib/* io.druid.cli.Main server overlord

java `cat conf-quickstart/druid/historical/jvm.config | xargs` -cp conf-quickstart/druid/_common:conf-quickstart/druid/historical:lib/* io.druid.cli.Main server historical

java `cat conf-quickstart/druid/middleManager/jvm.config | xargs` -cp conf-quickstart/druid/_common:conf-quickstart/druid/middleManager:lib/* io.druid.cli.Main server middleManager

java `cat conf-quickstart/druid/broker/jvm.config | xargs` -cp conf-quickstart/druid/_common:conf-quickstart/druid/broker:lib/* io.druid.cli.Main server broker

bootstrap


Index data

try to insert data into druid.

curl -X 'POST' -H 'Content-Type:application/json' -d @examples/wikipedia-index.json http://localhost:8090/druid/indexer/v1/task

examples/wikipedia-index.json

{
  "type" : "index",
  "spec" : {
    "dataSchema" : {
      "dataSource" : "wikipedia",
      "parser" : {
        "type" : "string",
        "parseSpec" : {
          "format" : "json",
          "dimensionsSpec" : {
            "dimensions" : [
              "channel",
              "cityName",
              "comment",
              "countryIsoCode",
              "countryName",
              "isAnonymous",
              "isMinor",
              "isNew",
              "isRobot",
              "isUnpatrolled",
              "metroCode",
              "namespace",
              "page",
              "regionIsoCode",
              "regionName",
              "user",
              { "name": "added", "type": "long" },
              { "name": "deleted", "type": "long" },
              { "name": "delta", "type": "long" }
            ]
          },
          "timestampSpec": {
            "column": "time",
            "format": "iso"
          }
        }
      },
      "metricsSpec" : [],
      "granularitySpec" : {
        "type" : "uniform",
        "segmentGranularity" : "day",
        "queryGranularity" : "none",
        "intervals" : ["2015-09-12/2015-09-13"],
        "rollup" : false
      }
    },
    "ioConfig" : {
      "type" : "index",
      "firehose" : {
        "type" : "local",
        "baseDir" : "quickstart/",
        "filter" : "wikiticker-2015-09-12-sampled.json.gz"
      },
      "appendToExisting" : false
    },
    "tuningConfig" : {
      "type" : "index",
      "targetPartitionSize" : 5000000,
      "maxRowsInMemory" : 25000,
      "forceExtendableShardSpecs" : true
    }
  }
}

Index data


Query data

Build-in Queries

try to query data from druid.

curl -X 'POST' -H 'Content-Type:application/json' -d @examples/wikipedia-top-pages.json http://localhost:8082/druid/v2?pretty

wikipedia-top-pages.json

{
  "queryType" : "topN",
  "dataSource" : "wikipedia",
  "intervals" : ["2015-09-12/2015-09-13"],
  "granularity" : "all",
  "dimension" : "page",
  "metric" : "count",
  "threshold" : 10,
  "aggregations" : [
    {
      "type" : "count",
      "name" : "count"
    }
  ]
}

build-in TopN

SQL Queries

上面topN是druid内置的TopN queries。而通过上述配置,sql是默认没有开启的。如果需要开启sql,需要在broker/common下面添加druid.sql.enable=true。官网是在examples/conf/druid/_common/common.runtime.properties下面默认开启的。而我们这边是通过conf-quickstart/druid/broker/runtime.properties启动的,两种方式皆可,取决于druid.sql.enable=true是否被识别而已。

Sql TopN

curl -X 'POST' -H 'Content-Type:application/json' -d @examples/wikipedia-top-pages-sql.json http://localhost:8082/druid/v2/sql

wikipedia-top-pages-sql.json

{
  "query":"SELECT page, COUNT(*) AS Edits FROM wikipedia WHERE \"__time\" BETWEEN TIMESTAMP '2015-09-12 00:00:00' AND TIMESTAMP '2015-09-13 00:00:00' GROUP BY page ORDER BY Edits DESC LIMIT 10"
}

sql TopN

Timeseries

curl -X 'POST' -H 'Content-Type:application/json' -d @examples/wikipedia-timeseries-sql.json http://localhost:8082/druid/v2/sql

wikipedia-timeseries-sql.json

{
  "query":"SELECT FLOOR(__time to HOUR) AS HourTime, SUM(deleted) AS LinesDeleted FROM wikipedia WHERE \"__time\" BETWEEN TIMESTAMP '2015-09-12 00:00:00' AND TIMESTAMP '2015-09-13 00:00:00' GROUP BY FLOOR(__time to HOUR)"
}

timeseries

GroupBy

curl -X 'POST' -H 'Content-Type:application/json' -d @examples/wikipedia-groupby-sql.json http://localhost:8082/druid/v2/sql

wikipedia-groupby-sql.json

{
  "query":"SELECT channel, SUM(added) FROM wikipedia WHERE \"__time\" BETWEEN TIMESTAMP '2015-09-12 00:00:00' AND TIMESTAMP '2015-09-13 00:00:00' GROUP BY channel ORDER BY SUM(added) DESC LIMIT 5"
}

groupBy

Scan

curl -X 'POST' -H 'Content-Type:application/json' -d @examples/wikipedia-scan-sql.json http://localhost:8082/druid/v2/sql

wikipedia-scan-sql.json

{
  "query":"SELECT user, page FROM wikipedia WHERE \"__time\" BETWEEN TIMESTAMP '2015-09-12 02:00:00' AND TIMESTAMP '2015-09-12 03:00:00' LIMIT 5"
}

scan

Explain

curl -X 'POST' -H 'Content-Type:application/json' -d @examples/wikipedia-explain-top-pages-sql.json http://localhost:8082/druid/v2/sql

wikipedia-explain-top-pages-sql.json

{
  "query":"EXPLAIN PLAN FOR SELECT page, COUNT(*) AS Edits FROM wikipedia WHERE \"__time\" BETWEEN TIMESTAMP '2015-09-12 00:00:00' AND TIMESTAMP '2015-09-13 00:00:00' GROUP BY page ORDER BY Edits DESC LIMIT 10"
}

explain plan


data migration

有时候当旧集群不足以再存放新数据的时候,一方面可以对旧集群扩容,另一方面可以新开一个集群。而新开一个集群就需要将历史数据导过来,不然search就缺了旧数据,一般步骤如下,

  • 切流。切换新数据到新集群
  • 搬移。将旧数据搬移到新集群

旧数据搬移到新集群在druid的具体操作是,

  • 将具体的segments从srcPath直接cp到destPath hadoop distcp hdfs://nn1:8020/src/path/to/segment/file hdfs://nn1:8020/dest/path/to/segment/file
  • 在新的destPath下面为segments建立新的metadata
    java \
    -cp "/home/chenfh5/project/druid/druid-0.12.3/lib/*" \
    -Ddruid.metadata.storage.type=mysql \
    -Ddruid.metadata.storage.connector.connectURI=jdbc:mysql://localhost:3306/druid2 \
    -Ddruid.metadata.storage.connector.user=yourname\
    -Ddruid.metadata.storage.connector.password=yourpwd\
    -Ddruid.extensions.loadList=[\"mysql-metadata-storage\",\"druid-hdfs-storage\"] \
    -Ddruid.storage.type=hdfs \
    io.druid.cli.Main tools insert-segment-to-db --workingDir hdfs://nn1:8020/dest/path/to/segment/ --updateDescriptor true
    

    如果mysql-metadata-storage不在extensions文件夹下,就到官网下载一个,然后解压过去。

双集群迁移测试

为了在本地虚拟上部署双集群,因为用到insert-segment-to-db这个tool,所以,

  1. 修改-Ddruid.metadata.storage为默认db(Derby )
  2. 修改默认metadata.storage为mysql,使其适配-Ddruid.metadata.storage 在install章节里面,看到 java cat conf-quickstart/druid/coordinator/jvm.config | xargs -cp conf-quickstart/druid/_common:conf-quickstart/druid/coordinator:lib/* io.druid.cli.Main server coordinator 是通过conf-quickstart/druid/_common来启动的,其下的properties是,
# For Derby server on your Druid Coordinator (only viable in a cluster with a single Coordinator, no fail-over):
druid.metadata.storage.type=derby
druid.metadata.storage.connector.connectURI=jdbc:derby://localhost:1527/var/druid/metadata.db;create=true
druid.metadata.storage.connector.host=localhost
druid.metadata.storage.connector.port=1527

改为,For MySQL:

druid.extensions.loadList=["mysql-metadata-storage"]

druid.metadata.storage.type=mysql
druid.metadata.storage.connector.connectURI=jdbc:mysql://db.example.com:3306/druid
druid.metadata.storage.connector.user=...
druid.metadata.storage.connector.password=...

另外,修改为mysql之后,可能会出现Table doesn't exist的情况,网上有解决方式

然后就可以启动了。

cluster1

启动cluster;insert wikipedia数据集

cluster2

cat conf-quickstart/druid/*/runtime.properties | grep -C2 port 看到五大角色已占用端口,修改这里,使第二集群不会产生端口冲突。

默认集群1端口

集群2端口

  • cp -r /home/chenfh5/project/druid/druid-0.12.3/var/druid/segments/wikipedia/ /home/chenfh5/project/druid/druid-0.12.3.bak/var/druid/segments/
  • insert-segment-to-db

check

curl -XGET http://localhost:9081/druid/coordinator/v1/metadata/datasources?full


delete data

# marked as "unused"
curl -XDELETE http://localhost:8081/druid/coordinator/v1/datasources/{dataSourceName}
curl -XDELETE http://localhost:8081/druid/coordinator/v1/datasources/{dataSourceName}/intervals/{interval}
`interval = 2016-06-27_2016-06-28` (_)
# run Kill Task will delete any "unused" segments
curl -X 'POST' -H 'Content-Type:application/json' http://localhost:8090/druid/indexer/v1/task -d'{
  "type": "kill",
  "dataSource": "deletion-tutorial",
  "interval" : "2015-09-12/2015-09-13"
}'
`interval = 2015-09-12/2015-09-13` (/)

Reference