1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
|
// Wait implements Poll.
func (p *defaultPoll) Wait() error {
// init
// barriercap = 32
var size, caps = 1024, barriercap
/*
type barrier struct {
bs [][]byte
ivs []syscall.Iovec // io向量,用于readv和sendmsg,sendmsg中是把过个不连续的数据通过一次系统调用写入内核
}
*/
// 最多1024个events同时触发
var events, barriers = make([]syscall.Kevent_t, size), make([]barrier, size)
for i := range barriers {
barriers[i].bs = make([][]byte, caps)
barriers[i].ivs = make([]syscall.Iovec, caps)
}
// wait
for {
var hups []*FDOperator
// 监听fd是否有触发
n, err := syscall.Kevent(p.fd, nil, events, nil)
if err != nil && err != syscall.EINTR {
// exit gracefully
if err == syscall.EBADF {
return nil
}
return err
}
for i := 0; i < n; i++ {
// trigger
// 如果Ident==0表示用户自定义事件
if events[i].Ident == 0 {
// clean trigger
// 清空标记位
atomic.StoreUint32(&p.trigger, 0)
continue
}
// 这里会拿到之前存储在Udata中的数据,因为Udata里存储的是*FDOperator,所以Udata的指针是一个**FDOperator,这个上面已经说过
var operator = *(**FDOperator)(unsafe.Pointer(&events[i].Udata))
// 原子操作,检查operator是否在处理其他逻辑
if !operator.do() {
continue
}
switch {
// 如果出现关闭的,则放入hups中等待集中处理,这个在Kqueue的代码中有提到过
case events[i].Flags&syscall.EV_EOF != 0:
hups = append(hups, operator)
// 读事件
case events[i].Filter == syscall.EVFILT_READ && events[i].Flags&syscall.EV_ENABLE != 0:
// for non-connection
// 这里针对socketFd,调用的是server处注册的onRead,具体下面的方法进行分析
if operator.OnRead != nil {
operator.OnRead(p)
break
}
// only for connection
/*
// inputs implements FDOperator.
func (c *connection) inputs(vs [][]byte) (rs [][]byte) {
// cas是否可读,不可读则结束本次操作
if !c.lock(reading) {
return rs
}
n := int(atomic.LoadInt32(&c.waitReadSize))
if n <= pagesize {
return c.inputBuffer.Book(pagesize, vs)
}
n -= c.inputBuffer.Len()
if n < pagesize {
n = pagesize
}
return c.inputBuffer.Book(n, vs)
}
*/
// 获取一个读缓存
var bs = operator.Inputs(barriers[i].bs)
if len(bs) == 0 {
break
}
// 系统调用readv,读取内容到ivs中,readv与read的区别是readv读可以将读到的数据放入多个不连续的缓存中(iovec)
var n, err = readv(operator.FD, bs, barriers[i].ivs)
/*
// inputAck implements FDOperator.
func (c *connection) inputAck(n int) (err error) {
if n < 0 {
n = 0
}
lack := atomic.AddInt32(&c.waitReadSize, int32(-n))
err = c.inputBuffer.BookAck(n, lack <= 0)
c.unlock(reading)
c.triggerRead()
c.onRequest()
return err
}
*/
// 触发读取,然后会开一个线程处理之前定义的onRequest
operator.InputAck(n)
if err != nil && err != syscall.EAGAIN && err != syscall.EINTR {
log.Printf("readv(fd=%d) failed: %s", operator.FD, err.Error())
hups = append(hups, operator)
}
// 写事件
case events[i].Filter == syscall.EVFILT_WRITE && events[i].Flags&syscall.EV_ENABLE != 0:
// for non-connection
// 针对socket
if operator.OnWrite != nil {
operator.OnWrite(p)
break
}
// only for connection
// 返回写的数据和是否使用零拷贝
var bs, supportZeroCopy = operator.Outputs(barriers[i].bs)
if len(bs) == 0 {
break
}
// TODO: Let the upper layer pass in whether to use ZeroCopy.
// 发送数据
var n, err = sendmsg(operator.FD, bs, barriers[i].ivs, false && supportZeroCopy)
// 等待数据发送完成,然后释放缓存
operator.OutputAck(n)
if err != nil && err != syscall.EAGAIN {
log.Printf("sendmsg(fd=%d) failed: %s", operator.FD, err.Error())
hups = append(hups, operator)
}
}
operator.done()
}
// hup conns together to avoid blocking the poll.
if len(hups) > 0 {
p.detaches(hups)
}
}
}
|