-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathqueue.go
More file actions
120 lines (108 loc) · 2.35 KB
/
queue.go
File metadata and controls
120 lines (108 loc) · 2.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package protocol
import (
"sync"
"sync/atomic"
"time"
)
type queue struct {
poolStart chan bool
poolEnd chan bool
pushLock sync.Mutex
popLock sync.Mutex
maxSize int
curSize int32
wIndex int
rIndex int
queue []*protocolPackage
}
func newQueue(size int) *queue {
if size < 1 {
size = 1
}
return &queue{
// Start和End信号池用于保证push和pop操作不会互相干扰
// 每次Push和Pop操作后,两个信号池中的信号数量都会保持一致
poolStart: make(chan bool, size),
poolEnd: make(chan bool, size),
// 保证push操作完整性
pushLock: sync.Mutex{},
// 保证pop操作完整性
popLock: sync.Mutex{},
// 队列中元素最大数量
maxSize: size,
// 队列当前元素数量
curSize: 0,
// push指针
wIndex: 0,
// pop指针
rIndex: 0,
// 元素数组
queue: make([]*protocolPackage, size),
}
}
func (q *queue) push(item *protocolPackage, timeout int) (res bool) {
q.pushLock.Lock()
defer func() {
// push成功后队列大小+1
atomic.AddInt32(&q.curSize, 1)
q.pushLock.Unlock()
if res {
// 向End信号池发送一个信号,表示完成此次push
q.poolEnd <- true
}
}()
// 操作成功代表队列不满,向Start信号池发送一个信号,表示开始push
if timeout > 0 {
select {
case q.poolStart <- true:
case <-time.After(time.Duration(timeout) * time.Second):
res = false
return
}
} else {
q.poolStart <- true
}
q.queue[q.wIndex] = item
q.wIndex++
if q.wIndex >= q.maxSize {
q.wIndex = 0
}
res = true
return
}
func (q *queue) pop(timeout int) (item *protocolPackage) {
q.popLock.Lock()
defer func() {
// pop成功后队列大小-1
atomic.AddInt32(&q.curSize, -1)
q.popLock.Unlock()
if item != nil {
// 当前元素已经成功取出,释放当前位置
<-q.poolStart
}
}()
// 操作成功代表队列非空,只有End信号池中有信号,才能保证有完整的元素在队列中
if timeout > 0 {
select {
case <-q.poolEnd:
case <-time.After(time.Duration(timeout) * time.Second):
item = nil
return
}
} else {
<-q.poolEnd
}
item = q.queue[q.rIndex]
q.queue[q.rIndex] = nil
q.rIndex++
if q.rIndex >= q.maxSize {
q.rIndex = 0
}
return
}
func (q *queue) size() int32 {
return atomic.LoadInt32(&q.curSize)
}
func (q *queue) isEmpty() bool {
return atomic.LoadInt32(&q.curSize) == 0
}