scylladb关注点

Overview

最近看到scylladb的与其他db的比较文档比较全面, 其中与cassandra对比更是超出一截, 所以过来看看,

其是基于C++ seastar9,10重写的column-base nosql, 实现了CAP中的AP,

  • masterless, hash ring是P
  • replication_factor和WAL commit log是A, 因此at least one, 所以不能做到C

image

CAP theorem

Version

  • scylladb, 4.1.2
  • scylla-monitoring, 3.4.2

Architect

read and write flow

image

read and write flow, credit: intel

虚线表示read op在key cache没有命中, 此时会搜索partition summary来确定partition index, index通过compression offset map定位data在sstable的位置15

partitioning

image

a hash ring to find necessary nodes5 当扩缩容node时, 整个拓扑结构会发生变化, 此时会触发自动rebalance

数据的倾斜与否, 取决于hash(key)的均匀度

PRIMARY KEY有2个作用,

  1. partition keys of the table lets you group rows on the same replica set, determines where data is stored on a given node in the cluster, 指定节点
  2. clustering columns control how those rows are stored on the replica/node, 在step1的指定节点上的数据存储

PRIMARY KEY ((a, b), c, d) : a and b compose the partition key, and c is the clustering column.

mapping

scylladb mysql
cluster instance
keyspace database
table table
type 自定义数据类型

AddOn

Scylla Manager & Scylla Monitoring Stack 监控各个节点, 查看集群

image

overview, credit: scylladb

image

overview2, port usage, credit: scylladb

image

ingest from batch and streaming2

Install

scylladb

下面根据相关步骤, 来setup一个简单的cluster,

  1. 准备, create docker-compose.yml with following content,
     version: '3'
        
     services:
       some-scylla:
         image: scylladb/scylla
         container_name: some-scylla
        
       some-scylla2:
         image: scylladb/scylla
         container_name: some-scylla2
         command: --seeds=some-scylla
        
       some-scylla3:
         image: scylladb/scylla
         container_name: some-scylla3
         command: --seeds=some-scylla
    

    啊, 😤, 如果用compose的话, scylla在prometheus是up不起来的, 所以还是老老实实回归best practice的步骤5 image

    failed with cmopose

    即,

     docker run --name some-scylla scylladb/scylla
     docker run --name some-scylla2 scylladb/scylla --seeds="$(docker inspect --format='' some-scylla)"
     docker run --name some-scylla3 scylladb/scylla --seeds="$(docker inspect --format='' some-scylla)"
    

    image

    passed with single-command

  2. 启动
    • docker-compose up -d
  3. 检查
    • docker exec -it some-scylla scylla --version
    • docker logs some-scylla | tail
    • nodetool
      • docker exec -it some-scylla nodetool status
      • docker exec -it some-scylla nodetool describecluster, 查看集群信息
      • docker exec -it some-scylla nodetool help
    • cqlsh
      • docker exec -it some-scylla cqlsh
      • describe tables;
      • select * from system_schema.scylla_tables limit 13;
    • 查看/修改集群配置
    • docker exec -it some-scylla grep --color 'cluster' /etc/scylla/scylla.yaml
    • docker exec -it some-scylla /bin/bash
    • docker exec -it some-scylla cqlsh UPDATE system.local SET cluster_name = 'my_cluster' where key='local';

monitoring

  1. download
     wget https://github.com/scylladb/scylla-monitoring/archive/scylla-monitoring-3.4.2.tar.gz
     tar -xvf scylla-monitoring-3.4.2.tar.gz
     cd scylla-monitoring-scylla-monitoring-3.4.2
    
  2. 准备, create scylla-monitoring-scylla-monitoring-3.4.2/prometheus/scylla_servers.yml ```shell
    • targets: - 172.17.0.2:9180 - 172.17.0.3:9180 - 172.17.0.4:9180 labels: cluster: ‘Test Cluster’ dc: datacenter1 ``` 配置value要对号入座, Use the nodetool to validate them
  3. 启动
    • 当在mac启动时, 会找不到readlink, 所以将下面的command加到start-all.sh的头
      • alias readlink=greadlink
    • sh start-all.sh
  4. 检查
    • http://localhost:3000/ image
  5. 数据访问
     docker exec -it some-scylla cqlsh
     CREATE KEYSPACE my_keyspace WITH replication = {'class':'SimpleStrategy', 'replication_factor' : 3};
     USE my_keyspace;
     CREATE TABLE students (id int PRIMARY KEY, name text, courses map<text, double>);
        
     DESCRIBE KEYSPACE my_keyspace;
     DESCRIBE TABLE students;
        
     INSERT INTO students (id, name, courses) VALUES (1, '张三', {'语文': 71.1, '数学': 82, '英语': 93.3});
     INSERT INTO students (id, name, courses) VALUES (2, '李四', {'语文': 91.1, '数学': 82, '英语': 73.3});
     select * from students;
    

    image

manager

  • 安装
  • 功能
    • 一个集群管理系统, 通过她可以使用CLI来handle一系列task8

benchmark

cassandra-stress13

在前面setup了一个3节点的cluster, 为了使用这个bm tool, 另起一个新的cluster为了与之隔离,

  1. new bm cluster
    • docker run --name some-scylla-bm-only -d scylladb/scylla
  2. enter bm cluster
    • docker exec -it some-scylla-bm-only /bin/bash
  3. run bm cmd
      • cassandra-stress write n=1000000 -rate threads=64 -node 172.17.0.2 -graph file=graph.html title=awesome revision=bm-write
      • cassandra-stress write no-warmup n=1000000 -rate threads=64 -node 172.17.0.2 -graph file=graph.html title=awesome revision=bm-no_warmup_write
      • cassandra-stress read n=1000000 -rate threads=64 -node 172.17.0.2 -graph file=graph.html title=awesome revision=bm-read
      • cassandra-stress read no-warmup n=1000000 -rate threads=64 -node 172.17.0.2 -graph file=graph.html title=awesome revision=bm-no_warmup_read
    • 混合
      • cassandra-stress mixed ratio\(write=1,read=1\) n=1000000 -rate threads=64 -node 172.17.0.2 -graph file=graph.html title=awesome revision=bm-mixed
      • cassandra-stress mixed ratio\(write=1,read=1\) no-warmup n=1000000 -rate threads=64 -node 172.17.0.2 -graph file=graph.html title=awesome revision=bm-no_warmup_mixed

image

write

image

read

image

mixed

scylla-bench14

go client级别的benchmark工具,

  1. install
    • go get github.com/scylladb/scylla-bench
  2. run bm cmd
    • scylla-bench -workload sequential -mode write -nodes 127.0.0.1:9042
        mode write
        Results
        Time (avg):	 4m33.860611808s
        Total ops:	 1000000
        Total rows:	 1000000
        Operations/s:	 3746.9349633592005
        Rows/s:		 3746.9349633592005
        Latency:
          max:		 1.160773631s
          99.9th:	 21.037055ms
          99th:		 10.813439ms
          95th:		 6.750207ms
          90th:		 5.439487ms
          median:	 2.949119ms
          mean:		 3.464229ms        
      
    • scylla-bench -workload sequential -mode read -nodes 127.0.0.1:9042
        mode read
        Results
        Time (avg):	 4m11.740969357s
        Total ops:	 1000000
        Total rows:	 1000000
        Operations/s:	 3972.3625839390193
        Rows/s:		 3972.3625839390193
        Latency:
          max:		 1.015545855s
          99.9th:	 21.626879ms
          99th:		 12.124159ms
          95th:		 7.766015ms
          90th:		 6.324223ms
          median:	 3.473407ms
          mean:		 4.024346ms
      
    • 因为这个bm cmd我是运行在host, 所以docker run some-scylla时, 我加上了 -p 9042:9042

connector

spark batch writer/reader16

"org.apache.spark"   %% "spark-sql"                 % "3.0.0",
"com.datastax.spark" %% "spark-cassandra-connector" % "2.5.1",
"joda-time"          % "joda-time"                  % "2.10.6",
package io.github.chenfh5.scylladb
import com.datastax.spark.connector._
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}

object SparkExample {

  private val keyspaceName = "my_keyspace"
  private val tableName = "students"
  private val scylladbHostIP = "127.0.0.1"

  private val ss = {
    val conf = new SparkConf()
    conf.set("spark.app.name", "scylladb_writer_test")
    conf.set("spark.master", "local[1]")
    conf.set("spark.cassandra.connection.host", scylladbHostIP)
    SparkSession.builder().config(conf).getOrCreate()
  }
  import ss.implicits._

  def scan(): Unit = {
    val rdd = ss.sparkContext.cassandraTable(keyspaceName, tableName)
    rdd.take(11).foreach(println)
  }

  def writer(): Unit = {
    val df = ss.sparkContext
      .parallelize((10 to 13).map(i =>
        (i, "我的名字是%d".format(i), Map("语文" -> (i + 0.1), "数学" -> (i + 0.2), "spark" -> (i + 0.3)))))
      .toDF("id", "name", "courses")

    df.show()
    df.write
      .format("org.apache.spark.sql.cassandra")
      .option("keyspace", keyspaceName)
      .option("table", tableName)
      .mode(SaveMode.Append)
      .save()
  }

  def read(): Unit = {
    val df = ss.read
      .format("org.apache.spark.sql.cassandra")
      .option("keyspace", keyspaceName)
      .option("table", tableName)
      .load
      .filter("id > 2")
      .select("id", "name", "courses")

    df.show(11, truncate = false)
  }

  def main(args: Array[String]): Unit = {
    println("begin")
    scan()
    writer()
    read()
    println("end")
  }

}
"org.apache.flink" %% "flink-streaming-scala"     % "1.11.1",
"org.apache.flink" %% "flink-clients"             % "1.11.1",
"org.apache.flink" %% "flink-connector-cassandra" % "1.11.1",
package io.github.chenfh5.scylladb

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.connectors.cassandra.CassandraSink

import scala.collection.JavaConverters._
object FlinkExample {

  private val keyspaceName = "my_keyspace"
  private val tableName = "students"
  private val scylladbHostIP = "127.0.0.1"

  def writer(): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)

    val source =
      env.fromCollection((20 to 25).map(i =>
        (i, "我的名字是%d".format(i), Map("语文" -> (i + 0.1), "数学" -> (i + 0.2), "flink" -> (i + 0.3)).asJava))) // need convert scala to java pojo

    val result = source.filter(e => e._1 < 24).keyBy(_._1)

    CassandraSink
      .addSink(result)
      .setQuery("INSERT INTO %s.%s(id, name, courses) values (?, ?, ?);".format(keyspaceName, tableName))
      .setHost(scylladbHostIP)
      .build()

    result.print().setParallelism(1)
    env.execute()
  }

  def main(args: Array[String]): Unit = {
    println("begin")
    writer()
    println("end")
  }

}

golang reader18

类似于scylla-bench14DoReadsFromTable()或者DoScanTable()

package main

import (
    "fmt"

    "github.com/gocql/gocql"
)

const (
    keyspaceName   = "my_keyspace"
    tableName      = "students"
    scylladbHostIP = "127.0.0.1"
)

func main() {
    // connect to the cluster
    cluster := gocql.NewCluster(scylladbHostIP)
    cluster.Keyspace = keyspaceName
    session, _ := cluster.CreateSession()
    defer session.Close()

    // define schema
    var id int
    var name string
    var courses map[string]float64
    
    // list records
    iter := session.Query(fmt.Sprintf("SELECT id, name, courses FROM %s where token(id) <= ? LIMIT 10", tableName), "12").Iter()
    for iter.Scan(&id, &name, &courses) {
        fmt.Println("student:", id, name, courses)
    }
    if err := iter.Close(); err != nil {
        fmt.Println(err)
    }
}

Reference

  1. scylladb
  2. Turning messy data into a gold mine using Spark, Flink, and ScyllaDB
  3. Making NoSQL Databases Persistent-Memory-Aware: The Apache Cassandra* Example
  4. Scylla Download Center
  5. Best Practices for Running Scylla on Docker
  6. docker hub scylladb doc
  7. install monitoring
  8. manager CLI
  9. 异步编程框架Seastar介绍
  10. 现代硬件上的高性能C+异步框架-SeaStar
  11. ScyllaDB调研分析
  12. 通用高效的数据修复方法:Row level repair
  13. cassandra-stress
  14. scylla-bench
  15. How Cassandra reads and writes data
  16. spark-cassandra-connector
  17. Flink Cassandra Connector
  18. Scylla Go Driver