Overview
今天看到一个关于io模型的文章1, 顺带整理一下并用jvm来试着实现
Points
IO也称I/O, 是input/ouput的简写, 即输入/输出. 常指数据在内存, 外存和其他周边设备之间的传输
linux系统调用函数
name | description |
---|---|
recvfrom | receive messages from socket |
select | allows a program to monitor multiple file descriptors, O(n) |
poll | linkedlist, it waits for one of a set of file descriptors, O(n) |
epoll | event notification facility, O(1) |
sigaction | examine and change a signal action, O(1) |
name | description |
---|---|
阻塞blocking | 一直等着直到任务完成 |
非阻塞non-blocking | 会立即返回一个状态值. 只需主动轮询查看任务进度, 或被异步通知 |
同步sync | 任务完成后不主动通知, 等待或轮询 |
异步async | 任务完成后主动通知 |
combination name | description |
---|---|
sync-blocking | easiest but poor performance (等待) |
sync-non-blocking | improve in IO, but need more cpu to wait (主动轮询) |
async-blocking | often used in replica mechanism. master/source/primary will sync-blocking, slave/replica/follower will async-blocking |
async-non-blocking | too complicated, like gossip protocol, suitable for small msg with high frequence |
name | description | example |
---|---|---|
BIO | blocking io, 同步且阻塞的通信模式 | 一直等着 |
NIO | non-blocking io, 同步非阻塞 | 提交之后不是一直等着, 而是每隔一段时间回来看看是否完成, 即轮询 |
AIO | asynchronous non-blocking io, 异步非阻塞 | 也一直等着, 也不需要再间隔回头看. 而是任务完成后主动通知 |
Code
import java.io.{FileInputStream, _}
import java.nio.ByteBuffer
import java.nio.channels.{AsynchronousFileChannel, CompletionHandler}
import java.nio.charset.StandardCharsets
import java.nio.file.{Paths, StandardOpenOption}
object JavaIO {
def main(args: Array[String]): Unit = {
bio
nio
aio
}
def bio = {
case class User(name: String, age: Int)
val user = User("hello bio", 9)
val fileName = "tmp-bio"
// write obj to file
var oos: ObjectOutputStream = null
try {
oos = new ObjectOutputStream(new FileOutputStream(fileName))
oos.writeObject(user)
} catch {
case e: Exception => e.printStackTrace()
} finally oos.close()
// read obj from File
var ois: ObjectInputStream = null
try {
ois = new ObjectInputStream(new FileInputStream(new File(fileName)))
val newUser = ois.readObject.asInstanceOf[User]
println(newUser)
} catch {
case e: Exception => e.printStackTrace()
} finally ois.close()
}
// here only use buffer, not using selector in server side
// check reference 5,7 for selector code
def nio = {
val fileName = "tmp-nio"
// write obj to file
var fos: FileOutputStream = null
try {
fos = new FileOutputStream(new File(fileName))
val channel = fos.getChannel
val src = StandardCharsets.UTF_8.encode("hello nio, \nhello nio2, \nhello nio3, \nhello nio4")
channel.write(src)
} catch {
case e: Exception => e.printStackTrace()
} finally fos.close()
// read obj from File
var fis: FileInputStream = null
try {
fis = new FileInputStream(new File(fileName))
val channel = fis.getChannel
val bf = ByteBuffer.allocate(128)
var len = 0
while (len != -1) {
len = channel.read(bf)
if (len != -1) {
bf.clear()
val bytes = bf.array()
val s = new String(bytes, 0, len, StandardCharsets.UTF_8)
println(s)
}
}
channel.close()
} catch {
case e: Exception => e.printStackTrace()
} finally fis.close()
}
def aio = {
val fileName = "tmp-aio"
// write obj to file
val wChannel = AsynchronousFileChannel.open(Paths.get(fileName), StandardOpenOption.WRITE, StandardOpenOption.CREATE);
val handler = new CompletionHandler[Integer, Object]() {
override def completed(result: Integer, attachment: Object): Unit = {
println("attachment=%s, result=%d bytes written".format(attachment, result))
}
override def failed(exc: Throwable, attachment: Object): Unit = {
println("attachment=%s, failed with=%s".format(attachment, exc.getMessage))
}
}
wChannel.write(ByteBuffer.wrap("hello".getBytes), 0, "1st write", handler)
wChannel.write(ByteBuffer.wrap("aio".getBytes), 0, "2nd write", handler)
wChannel.write(ByteBuffer.wrap("3".getBytes), 0, "3rd write", handler)
// wChannel.close()
// read obj from File
// should multi-thread here since async, like producer-consumer
val rChannel = AsynchronousFileChannel.open(Paths.get(fileName), StandardOpenOption.READ, StandardOpenOption.DELETE_ON_CLOSE)
val buffer = ByteBuffer.allocate(128)
val result = rChannel.read(buffer, 0)
while (!result.isDone) {
println("Do something else while reading is in progress...")
}
println("reading done, bytes read from file=%d".format(result.get()))
buffer.flip()
while (buffer.hasRemaining) {
println(buffer.get.toChar)
}
val bytesRead = result.get();
println("Bytes read [%d]".format(bytesRead))
buffer.clear()
rChannel.close()
}
}
Illustrations
下面用插图来加深印象
bio, 每一个client过来建立连接, server都要new一个thread, 消耗过大. from 4
bio, from 7
nio, from 4
nio, 每个channel对应一个buffer, 每个channel注册到selector. 而server只需要维护一个selector线程即可. from 5
这个selector线程O(n)来寻找next selectedKeys
nio, from 7
summary, from 6