java io models

Overview

今天看到一个关于io模型的文章1, 顺带整理一下并用jvm来试着实现

Points

IO也称I/O, 是input/ouput的简写, 即输入/输出. 常指数据在内存, 外存和其他周边设备之间的传输

linux系统调用函数

name description
recvfrom receive messages from socket
select allows a program to monitor multiple file descriptors, O(n)
poll linkedlist, it waits for one of a set of file descriptors, O(n)
epoll event notification facility, O(1)
sigaction examine and change a signal action, O(1)
name description
阻塞blocking 一直等着直到任务完成
非阻塞non-blocking 会立即返回一个状态值. 只需主动轮询查看任务进度, 或被异步通知
同步sync 任务完成后不主动通知, 等待或轮询
异步async 任务完成后主动通知
combination name description
sync-blocking easiest but poor performance (等待)
sync-non-blocking improve in IO, but need more cpu to wait (主动轮询)
async-blocking often used in replica mechanism. master/source/primary will sync-blocking, slave/replica/follower will async-blocking
async-non-blocking too complicated, like gossip protocol, suitable for small msg with high frequence
name description example
BIO blocking io, 同步且阻塞的通信模式 一直等着
NIO non-blocking io, 同步非阻塞 提交之后不是一直等着, 而是每隔一段时间回来看看是否完成, 即轮询
AIO asynchronous non-blocking io, 异步非阻塞 也一直等着, 也不需要再间隔回头看. 而是任务完成后主动通知

Code

import java.io.{FileInputStream, _}
import java.nio.ByteBuffer
import java.nio.channels.{AsynchronousFileChannel, CompletionHandler}
import java.nio.charset.StandardCharsets
import java.nio.file.{Paths, StandardOpenOption}

object JavaIO {

  def main(args: Array[String]): Unit = {
    bio
    nio
    aio
  }

  def bio = {
    case class User(name: String, age: Int)
    val user = User("hello bio", 9)
    val fileName = "tmp-bio"

    // write obj to file
    var oos: ObjectOutputStream = null
    try {
      oos = new ObjectOutputStream(new FileOutputStream(fileName))
      oos.writeObject(user)
    } catch {
      case e: Exception => e.printStackTrace()
    } finally oos.close()

    // read obj from File
    var ois: ObjectInputStream = null
    try {
      ois = new ObjectInputStream(new FileInputStream(new File(fileName)))
      val newUser = ois.readObject.asInstanceOf[User]
      println(newUser)
    } catch {
      case e: Exception => e.printStackTrace()
    } finally ois.close()
  }

  // here only use buffer, not using selector in server side
  // check reference 5,7 for selector code
  def nio = {
    val fileName = "tmp-nio"
    // write obj to file
    var fos: FileOutputStream = null
    try {
      fos = new FileOutputStream(new File(fileName))
      val channel = fos.getChannel
      val src = StandardCharsets.UTF_8.encode("hello nio, \nhello nio2, \nhello nio3, \nhello nio4")
      channel.write(src)
    } catch {
      case e: Exception => e.printStackTrace()
    } finally fos.close()

    // read obj from File
    var fis: FileInputStream = null
    try {
      fis = new FileInputStream(new File(fileName))
      val channel = fis.getChannel
      val bf = ByteBuffer.allocate(128)

      var len = 0
      while (len != -1) {
        len = channel.read(bf)
        if (len != -1) {
          bf.clear()
          val bytes = bf.array()
          val s = new String(bytes, 0, len, StandardCharsets.UTF_8)
          println(s)
        }
      }
      channel.close()
    } catch {
      case e: Exception => e.printStackTrace()
    } finally fis.close()
  }

  def aio = {
    val fileName = "tmp-aio"
    // write obj to file
    val wChannel = AsynchronousFileChannel.open(Paths.get(fileName), StandardOpenOption.WRITE, StandardOpenOption.CREATE);
    val handler = new CompletionHandler[Integer, Object]() {
      override def completed(result: Integer, attachment: Object): Unit = {
        println("attachment=%s, result=%d bytes written".format(attachment, result))
      }

      override def failed(exc: Throwable, attachment: Object): Unit = {
        println("attachment=%s, failed with=%s".format(attachment, exc.getMessage))
      }
    }
    wChannel.write(ByteBuffer.wrap("hello".getBytes), 0, "1st write", handler)
    wChannel.write(ByteBuffer.wrap("aio".getBytes), 0, "2nd write", handler)
    wChannel.write(ByteBuffer.wrap("3".getBytes), 0, "3rd write", handler)
//    wChannel.close()

    // read obj from File
    // should multi-thread here since async, like producer-consumer
    val rChannel = AsynchronousFileChannel.open(Paths.get(fileName), StandardOpenOption.READ, StandardOpenOption.DELETE_ON_CLOSE)
    val buffer = ByteBuffer.allocate(128)
    val result = rChannel.read(buffer, 0)
    while (!result.isDone) {
      println("Do something else while reading is in progress...")
    }
    println("reading done, bytes read from file=%d".format(result.get()))

    buffer.flip()
    while (buffer.hasRemaining) {
      println(buffer.get.toChar)
    }

    val bytesRead = result.get();
    println("Bytes read [%d]".format(bytesRead))
    buffer.clear()
    rChannel.close()
  }

}

Illustrations

下面用插图来加深印象 image

bio, 每一个client过来建立连接, server都要new一个thread, 消耗过大. from 4

image

bio, from 7

image

nio, from 4

image

nio, 每个channel对应一个buffer, 每个channel注册到selector. 而server只需要维护一个selector线程即可. from 5

这个selector线程O(n)来寻找next selectedKeys

image

nio, from 7

image

summary, from 6

Reference

  1. 一口气说出5种IO模型
  2. 什么是 BIO、NIO 和 AIO?
  3. linux cmd
  4. in-depth understanding of BIO, NIO, AIO
  5. 网络编程IO模式BIO&NIO&AIO, with code
  6. BIO、NIO、AIO 介绍和适用场景分析
  7. Blocking I/O and non-blocking I/O, with code