• R/O
  • HTTP
  • SSH
  • HTTPS

vapor: Commit

Golang implemented sidechain for Bytom


Commit MetaInfo

Revision8251b6d84500c2037d9d364d1b007d665bf781de (tree)
Time2019-08-05 18:05:16
AuthorYahtoo Ma <yahtoo.ma@gmai...>
CommiterYahtoo Ma

Log Message

Peer add announces new block message num limit

Change Summary

Incremental Difference

--- a/netsync/consensusmgr/block_fetcher.go
+++ b/netsync/consensusmgr/block_fetcher.go
@@ -1,7 +1,7 @@
11 package consensusmgr
22
33 import (
4- "github.com/sirupsen/logrus"
4+ log "github.com/sirupsen/logrus"
55 "gopkg.in/karalabe/cookiejar.v2/collections/prque"
66
77 "github.com/vapor/p2p/security"
@@ -10,8 +10,8 @@ import (
1010
1111 const (
1212 maxBlockDistance = 64
13- maxMsgSetSize = 128
1413 newBlockChSize = 64
14+ msgLimit = 128 // peer message number limit
1515 )
1616
1717 // blockFetcher is responsible for accumulating block announcements from various peers
@@ -21,24 +21,24 @@ type blockFetcher struct {
2121 peers Peers
2222
2323 newBlockCh chan *blockMsg
24- queue *prque.Prque
25- msgSet map[bc.Hash]*blockMsg
24+ queue *prque.Prque // block import priority queue
25+ msgSet map[bc.Hash]*blockMsg // already queued blocks
26+ msgCounter map[string]int // per peer msg counter to prevent DOS
2627 }
2728
2829 //NewBlockFetcher creates a block fetcher to retrieve blocks of the new propose.
2930 func newBlockFetcher(chain Chain, peers Peers) *blockFetcher {
30- f := &blockFetcher{
31+ return &blockFetcher{
3132 chain: chain,
3233 peers: peers,
3334 newBlockCh: make(chan *blockMsg, newBlockChSize),
3435 queue: prque.New(),
3536 msgSet: make(map[bc.Hash]*blockMsg),
37+ msgCounter: make(map[string]int),
3638 }
37- go f.blockProcessor()
38- return f
3939 }
4040
41-func (f *blockFetcher) blockProcessor() {
41+func (f *blockFetcher) blockProcessorLoop() {
4242 for {
4343 for !f.queue.Empty() {
4444 msg := f.queue.PopItem().(*blockMsg)
@@ -49,14 +49,25 @@ func (f *blockFetcher) blockProcessor() {
4949
5050 f.insert(msg)
5151 delete(f.msgSet, msg.block.Hash())
52+ f.msgCounter[msg.peerID]--
53+ if f.msgCounter[msg.peerID] <= 0 {
54+ delete(f.msgCounter, msg.peerID)
55+ }
5256 }
53- f.add(<-f.newBlockCh)
57+ f.add(<-f.newBlockCh, msgLimit)
5458 }
5559 }
5660
57-func (f *blockFetcher) add(msg *blockMsg) {
61+func (f *blockFetcher) add(msg *blockMsg, limit int) {
62+ // prevent DOS
63+ count := f.msgCounter[msg.peerID] + 1
64+ if count > limit {
65+ log.WithFields(log.Fields{"module": logModule, "peer": msg.peerID, "limit": limit}).Warn("The number of peer messages exceeds the limit")
66+ return
67+ }
68+
5869 bestHeight := f.chain.BestBlockHeight()
59- if len(f.msgSet) > maxMsgSetSize || bestHeight > msg.block.Height || msg.block.Height-bestHeight > maxBlockDistance {
70+ if bestHeight > msg.block.Height || msg.block.Height-bestHeight > maxBlockDistance {
6071 return
6172 }
6273
@@ -64,7 +75,8 @@ func (f *blockFetcher) add(msg *blockMsg) {
6475 if _, ok := f.msgSet[blockHash]; !ok {
6576 f.msgSet[blockHash] = msg
6677 f.queue.Push(msg, -float32(msg.block.Height))
67- logrus.WithFields(logrus.Fields{
78+ f.msgCounter[msg.peerID] = count
79+ log.WithFields(log.Fields{
6880 "module": logModule,
6981 "block height": msg.block.Height,
7082 "block hash": blockHash.String(),
@@ -79,7 +91,6 @@ func (f *blockFetcher) insert(msg *blockMsg) {
7991 if peer == nil {
8092 return
8193 }
82-
8394 f.peers.ProcessIllegal(msg.peerID, security.LevelMsgIllegal, err.Error())
8495 return
8596 }
@@ -90,12 +101,12 @@ func (f *blockFetcher) insert(msg *blockMsg) {
90101
91102 proposeMsg, err := NewBlockProposeMsg(msg.block)
92103 if err != nil {
93- logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on create BlockProposeMsg")
104+ log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed on create BlockProposeMsg")
94105 return
95106 }
96107
97108 if err := f.peers.BroadcastMsg(NewBroadcastMsg(proposeMsg, consensusChannel)); err != nil {
98- logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on broadcast proposed block")
109+ log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed on broadcast proposed block")
99110 return
100111 }
101112 }
--- a/netsync/consensusmgr/block_fetcher_test.go
+++ b/netsync/consensusmgr/block_fetcher_test.go
@@ -127,7 +127,7 @@ func TestBlockFetcher(t *testing.T) {
127127 },
128128 }
129129 fetcher := newBlockFetcher(newChain(), peers)
130-
130+ go fetcher.blockProcessorLoop()
131131 for i, c := range testCase {
132132 fetcher.processNewBlock(c.blockMsg)
133133 time.Sleep(10 * time.Millisecond)
@@ -137,3 +137,159 @@ func TestBlockFetcher(t *testing.T) {
137137 }
138138 }
139139 }
140+
141+func TestAddBlockMsg(t *testing.T) {
142+ peers := peers.NewPeerSet(&peerMgr{})
143+ testPeer := "peer1"
144+ testCase := []struct {
145+ blocksMsg []*blockMsg
146+ limit int
147+ queueSize int
148+ msgSetSize int
149+ msgCounter int
150+ }{
151+ //normal test
152+ {
153+ blocksMsg: []*blockMsg{
154+ {
155+ block: &types.Block{
156+ BlockHeader: types.BlockHeader{
157+ Height: 100,
158+ },
159+ },
160+ peerID: testPeer,
161+ },
162+ {
163+ block: &types.Block{
164+ BlockHeader: types.BlockHeader{
165+ Height: 101,
166+ },
167+ },
168+ peerID: testPeer,
169+ },
170+ {
171+ block: &types.Block{
172+ BlockHeader: types.BlockHeader{
173+ Height: 102,
174+ },
175+ },
176+ peerID: testPeer,
177+ },
178+ },
179+ limit: 5,
180+ queueSize: 3,
181+ msgSetSize: 3,
182+ msgCounter: 3,
183+ },
184+ // test DOS
185+ {
186+ blocksMsg: []*blockMsg{
187+ {
188+ block: &types.Block{
189+ BlockHeader: types.BlockHeader{
190+ Version: 1,
191+ Height: 100,
192+ },
193+ },
194+ peerID: testPeer,
195+ },
196+ {
197+ block: &types.Block{
198+ BlockHeader: types.BlockHeader{
199+ Version: 2,
200+ Height: 100,
201+ },
202+ },
203+ peerID: testPeer,
204+ },
205+ {
206+ block: &types.Block{
207+ BlockHeader: types.BlockHeader{
208+ Version: 3,
209+ Height: 100,
210+ },
211+ },
212+ peerID: testPeer,
213+ },
214+ {
215+ block: &types.Block{
216+ BlockHeader: types.BlockHeader{
217+ Version: 4,
218+ Height: 100,
219+ },
220+ },
221+ peerID: testPeer,
222+ },
223+ },
224+ limit: 3,
225+ queueSize: 3,
226+ msgSetSize: 3,
227+ msgCounter: 3,
228+ },
229+
230+ // test msg height does not meet the requirements
231+ {
232+ blocksMsg: []*blockMsg{
233+ {
234+ block: &types.Block{
235+ BlockHeader: types.BlockHeader{
236+ Version: 1,
237+ Height: 98,
238+ },
239+ },
240+ peerID: testPeer,
241+ },
242+ {
243+ block: &types.Block{
244+ BlockHeader: types.BlockHeader{
245+ Version: 2,
246+ Height: 97,
247+ },
248+ },
249+ peerID: testPeer,
250+ },
251+ {
252+ block: &types.Block{
253+ BlockHeader: types.BlockHeader{
254+ Version: 3,
255+ Height: 164,
256+ },
257+ },
258+ peerID: testPeer,
259+ },
260+ {
261+ block: &types.Block{
262+ BlockHeader: types.BlockHeader{
263+ Version: 4,
264+ Height: 165,
265+ },
266+ },
267+ peerID: testPeer,
268+ },
269+ },
270+ limit: 5,
271+ queueSize: 0,
272+ msgSetSize: 0,
273+ msgCounter: 0,
274+ },
275+ }
276+
277+ for i, c := range testCase {
278+ fetcher := newBlockFetcher(newChain(), peers)
279+ for _, msg := range c.blocksMsg {
280+ fetcher.add(msg, c.limit)
281+ }
282+
283+ if fetcher.queue.Size() != c.queueSize {
284+ t.Fatalf("index: %d queue size err got %d: want %d", i, fetcher.queue.Size(), c.queueSize)
285+ }
286+
287+ if len(fetcher.msgSet) != c.msgSetSize {
288+ t.Fatalf("index: %d msg set size err got %d: want %d", i, len(fetcher.msgSet), c.msgSetSize)
289+ }
290+
291+ if fetcher.msgCounter[testPeer] != c.msgCounter {
292+ t.Fatalf("index: %d peer msg counter err got %d: want %d", i, fetcher.msgCounter[testPeer], c.msgCounter)
293+ }
294+ }
295+}
--- a/netsync/consensusmgr/handle.go
+++ b/netsync/consensusmgr/handle.go
@@ -191,6 +191,7 @@ func (m *Manager) removePeer(peerID string) {
191191
192192 //Start consensus manager service.
193193 func (m *Manager) Start() error {
194+ go m.blockFetcher.blockProcessorLoop()
194195 go m.blockProposeMsgBroadcastLoop()
195196 go m.blockSignatureMsgBroadcastLoop()
196197 return nil
Show on old repository browser