diff --git a/main.go b/main.go index ac18f8f3ce..89d67bcacf 100644 --- a/main.go +++ b/main.go @@ -304,7 +304,7 @@ func initP2PNode(ctx *cli.Context, txpoolSvr *proc.TXPoolServer) (*p2pserver.P2P txpoolSvr.RegisterActor(tc.NetActor, p2pPID) hserver.SetNetServerPID(p2pPID) p2p.WaitForPeersStart() - log.Infof("P2P node init success") + log.Infof("P2P init success") return p2p, p2pPID, nil } diff --git a/p2pserver/actor/req/txnpool.go b/p2pserver/actor/req/txnpool.go index f6b3d01aab..c78d002685 100644 --- a/p2pserver/actor/req/txnpool.go +++ b/p2pserver/actor/req/txnpool.go @@ -41,7 +41,7 @@ func SetTxnPoolPid(txnPid *actor.PID) { //add txn to txnpool func AddTransaction(transaction *types.Transaction) { if txnPoolPid == nil { - log.Error("net_server AddTransaction(): txnpool pid is nil") + log.Error("[p2p]net_server AddTransaction(): txnpool pid is nil") return } txReq := &tc.TxReq{ @@ -55,13 +55,13 @@ func AddTransaction(transaction *types.Transaction) { //get txn according to hash func GetTransaction(hash common.Uint256) (*types.Transaction, error) { if txnPoolPid == nil { - log.Error("net_server tx pool pid is nil") - return nil, errors.NewErr("net_server tx pool pid is nil") + log.Warn("[p2p]net_server tx pool pid is nil") + return nil, errors.NewErr("[p2p]net_server tx pool pid is nil") } future := txnPoolPid.RequestFuture(&tc.GetTxnReq{Hash: hash}, txnPoolReqTimeout) result, err := future.Result() if err != nil { - log.Errorf("net_server GetTransaction error: %v\n", err) + log.Warnf("[p2p]net_server GetTransaction error: %v\n", err) return nil, err } return result.(tc.GetTxnRsp).Txn, nil diff --git a/p2pserver/actor/server/actor.go b/p2pserver/actor/server/actor.go index 3c7680a720..ddf69f947d 100644 --- a/p2pserver/actor/server/actor.go +++ b/p2pserver/actor/server/actor.go @@ -51,15 +51,15 @@ func (this *P2PActor) Start() (*actor.PID, error) { func (this *P2PActor) Receive(ctx actor.Context) { switch msg := ctx.Message().(type) { case *actor.Restarting: - log.Info("p2p actor restarting") + log.Warn("[p2p]actor restarting") case *actor.Stopping: - log.Info("p2p actor stopping") + log.Warn("[p2p]actor stopping") case *actor.Stopped: - log.Info("p2p actor stopped") + log.Warn("[p2p]actor stopped") case *actor.Started: - log.Info("p2p actor started") + log.Debug("[p2p]actor started") case *actor.Restart: - log.Info("p2p actor restart") + log.Warn("[p2p]actor restart") case *StopServerReq: this.handleStopServerReq(ctx, msg) case *GetPortReq: @@ -97,7 +97,7 @@ func (this *P2PActor) Receive(ctx actor.Context) { default: err := this.server.Xmit(ctx.Message()) if nil != err { - log.Error("error xmit message ", err.Error(), reflect.TypeOf(ctx.Message())) + log.Warn("[p2p]error xmit message ", err.Error(), reflect.TypeOf(ctx.Message())) } } } @@ -240,6 +240,6 @@ func (this *P2PActor) handleTransmitConsensusMsgReq(ctx actor.Context, req *Tran if peer != nil { this.server.Send(peer, req.Msg, true) } else { - log.Errorf("handleTransmit consensus msg failed:%d", req.Target) + log.Warnf("[p2p]can`t transmit consensus msg:no valid neighbor peer: %d\n", req.Target) } } diff --git a/p2pserver/block_sync.go b/p2pserver/block_sync.go index 1e1e48e35d..5481815aa7 100644 --- a/p2pserver/block_sync.go +++ b/p2pserver/block_sync.go @@ -292,7 +292,7 @@ func (this *BlockSyncMgr) checkTimeout() { } flightInfo.ResetStartTime() flightInfo.MarkFailedNode() - log.Infof("checkTimeout sync headers:%d timeout after:%d s Times:%d", height, SYNC_HEADER_REQUEST_TIMEOUT, flightInfo.GetTotalFailedTimes()) + log.Debugf("[p2p]checkTimeout sync headers:%d timeout after:%d s Times:%d", height, SYNC_HEADER_REQUEST_TIMEOUT, flightInfo.GetTotalFailedTimes()) reqNode := this.getNodeWithMinFailedTimes(flightInfo, curBlockHeight) if reqNode == nil { break @@ -303,7 +303,7 @@ func (this *BlockSyncMgr) checkTimeout() { msg := msgpack.NewHeadersReq(headerHash) err := this.server.Send(reqNode, msg, false) if err != nil { - log.Error("checkTimeout failed build a new headersReq") + log.Warn("[p2p]checkTimeout failed to send a new headersReq:s", err) } else { this.appendReqTime(reqNode.GetID()) } @@ -317,7 +317,7 @@ func (this *BlockSyncMgr) checkTimeout() { } flightInfo.ResetStartTime() flightInfo.MarkFailedNode() - log.Debugf("checkTimeout sync height:%d block:0x%x timeout after:%d s times:%d", flightInfo.Height, blockHash, SYNC_BLOCK_REQUEST_TIMEOUT, flightInfo.GetTotalFailedTimes()) + log.Debugf("[p2p]checkTimeout sync height:%d block:0x%x timeout after:%d s times:%d", flightInfo.Height, blockHash, SYNC_BLOCK_REQUEST_TIMEOUT, flightInfo.GetTotalFailedTimes()) reqNode := this.getNodeWithMinFailedTimes(flightInfo, curBlockHeight) if reqNode == nil { break @@ -327,15 +327,11 @@ func (this *BlockSyncMgr) checkTimeout() { msg := msgpack.NewBlkDataReq(blockHash) err := this.server.Send(reqNode, msg, false) if err != nil { - log.Error("checkTimeout NewBlkDataReq error:", err) + log.Warnf("[p2p]checkTimeout reqNode ID:%d Send error:%s", reqNode.GetID(), err) + continue } else { this.appendReqTime(reqNode.GetID()) } - - if err != nil { - log.Errorf("checkTimeout reqNode ID:%d Send error:%s", reqNode.GetID(), err) - continue - } } } } @@ -375,12 +371,12 @@ func (this *BlockSyncMgr) syncHeader() { msg := msgpack.NewHeadersReq(headerHash) err := this.server.Send(reqNode, msg, false) if err != nil { - log.Error("syncHeader failed build a new headersReq") + log.Warn("[p2p]syncHeader failed to send a new headersReq") } else { this.appendReqTime(reqNode.GetID()) } - log.Infof("syncHeader request Height:%d", NextHeaderId) + log.Infof("Header sync request height:%d", NextHeaderId) } func (this *BlockSyncMgr) syncBlock() { @@ -443,7 +439,7 @@ func (this *BlockSyncMgr) syncBlock() { msg := msgpack.NewBlkDataReq(nextBlockHash) err := this.server.Send(reqNode, msg, false) if err != nil { - log.Errorf("syncBlock Height:%d ReqBlkData error:%s", nextBlockHeight, err) + log.Warnf("[p2p]syncBlock Height:%d ReqBlkData error:%s", nextBlockHeight, err) return } else { this.appendReqTime(reqNode.GetID()) @@ -459,7 +455,7 @@ func (this *BlockSyncMgr) OnHeaderReceive(fromID uint64, headers []*types.Header if len(headers) == 0 { return } - log.Infof("OnHeaderReceive Height:%d - %d", headers[0].Height, headers[len(headers)-1].Height) + log.Infof("Header receive height:%d - %d", headers[0].Height, headers[len(headers)-1].Height) height := headers[0].Height curHeaderHeight := this.ledger.GetCurrentHeaderHeight() @@ -478,7 +474,7 @@ func (this *BlockSyncMgr) OnHeaderReceive(fromID uint64, headers []*types.Header if n != nil && n.GetErrorRespCnt() >= SYNC_MAX_ERROR_RESP_TIMES { this.delNode(fromID) } - log.Errorf("OnHeaderReceive AddHeaders error:%s", err) + log.Warnf("[p2p]OnHeaderReceive AddHeaders error:%s", err) return } this.syncHeader() @@ -488,7 +484,7 @@ func (this *BlockSyncMgr) OnHeaderReceive(fromID uint64, headers []*types.Header func (this *BlockSyncMgr) OnBlockReceive(fromID uint64, blockSize uint32, block *types.Block) { height := block.Header.Height blockHash := block.Hash() - log.Debugf("OnBlockReceive Height:%d", height) + log.Debugf("[p2p]OnBlockReceive Height:%d", height) flightInfos := this.flightBlocks[blockHash] for _, flightInfo := range flightInfos { if flightInfo.GetNodeId() == fromID { @@ -517,7 +513,7 @@ func (this *BlockSyncMgr) OnBlockReceive(fromID uint64, blockSize uint32, block //OnAddNode to node list when a new node added func (this *BlockSyncMgr) OnAddNode(nodeId uint64) { - log.Infof("OnAddNode:%d", nodeId) + log.Debugf("[p2p]OnAddNode:%d", nodeId) this.lock.Lock() defer this.lock.Unlock() w := NewNodeWeight(nodeId) @@ -539,6 +535,7 @@ func (this *BlockSyncMgr) delNode(nodeId uint64) { if len(this.nodeWeights) == 0 { log.Warnf("no sync nodes") } + log.Infof("OnDelNode:%d", nodeId) } func (this *BlockSyncMgr) tryGetSyncHeaderLock() bool { @@ -643,7 +640,7 @@ func (this *BlockSyncMgr) saveBlock() { if n != nil && n.GetErrorRespCnt() >= SYNC_MAX_ERROR_RESP_TIMES { this.delNode(fromID) } - log.Warnf("saveBlock Height:%d AddBlock error:%s", nextBlockHeight, err) + log.Warnf("[p2p]saveBlock Height:%d AddBlock error:%s", nextBlockHeight, err) reqNode := this.getNextNode(nextBlockHeight) if reqNode == nil { return @@ -652,7 +649,7 @@ func (this *BlockSyncMgr) saveBlock() { msg := msgpack.NewBlkDataReq(nextBlock.Hash()) err := this.server.Send(reqNode, msg, false) if err != nil { - log.Error("syncBlock error:", err) + log.Warn("[p2p]require new block error:", err) return } else { this.appendReqTime(reqNode.GetID()) diff --git a/p2pserver/common/p2p_common.go b/p2pserver/common/p2p_common.go index 6ca338f6a4..403a7b3af1 100644 --- a/p2pserver/common/p2p_common.go +++ b/p2pserver/common/p2p_common.go @@ -20,6 +20,7 @@ package common import ( "errors" + "strconv" "strings" "github.com/ontio/ontology/core/types" @@ -153,7 +154,7 @@ type AppendBlock struct { func ParseIPAddr(s string) (string, error) { i := strings.Index(s, ":") if i < 0 { - return s, errors.New("split ip address error") + return "", errors.New("[p2p]split ip address error") } return s[:i], nil } @@ -162,7 +163,14 @@ func ParseIPAddr(s string) (string, error) { func ParseIPPort(s string) (string, error) { i := strings.Index(s, ":") if i < 0 { - return s, errors.New("split ip port error") + return "", errors.New("[p2p]split ip port error") + } + port, err := strconv.Atoi(s[i+1:]) + if err != nil { + return "", errors.New("[p2p]parse port error") + } + if port <= 0 || port >= 65535 { + return "", errors.New("[p2p]port out of bound") } return s[i:], nil } diff --git a/p2pserver/link/link.go b/p2pserver/link/link.go index 6a46e751cd..a01daf8eb6 100644 --- a/p2pserver/link/link.go +++ b/p2pserver/link/link.go @@ -115,7 +115,7 @@ func (this *Link) Rx() { for { msg, payloadSize, err := types.ReadMessage(reader) if err != nil { - log.Error("read connection error ", err) + log.Infof("[p2p]error read from %s :%s", this.GetAddr(), err.Error()) break } @@ -141,6 +141,7 @@ func (this *Link) Rx() { //disconnectNotify push disconnect msg to channel func (this *Link) disconnectNotify() { + log.Debugf("[p2p]call disconnectNotify for %s\n", this.GetAddr()) this.CloseConn() msg, _ := types.MakeEmptyMessage(common.DISCONNECT_TYPE) @@ -163,18 +164,18 @@ func (this *Link) CloseConn() { func (this *Link) Tx(msg types.Message) error { conn := this.conn if conn == nil { - return errors.New("tx link invalid") + return errors.New("[p2p]tx link invalid") } buf := bytes.NewBuffer(nil) err := types.WriteMessage(buf, msg) if err != nil { - log.Error("error serialize messge ", err.Error()) + log.Debugf("[p2p]error serialize messge ", err.Error()) return err } payload := buf.Bytes() nByteCnt := len(payload) - log.Debugf("TX buf length: %d\n", nByteCnt) + log.Debugf("[p2p]TX buf length: %d\n", nByteCnt) nCount := nByteCnt / common.PER_SEND_LEN if nCount == 0 { @@ -183,7 +184,7 @@ func (this *Link) Tx(msg types.Message) error { conn.SetWriteDeadline(time.Now().Add(time.Duration(nCount*common.WRITE_DEADLINE) * time.Second)) _, err = conn.Write(payload) if err != nil { - log.Error("error sending messge to peer node ", err.Error()) + log.Infof("[p2p]error sending messge to %s :%s", this.GetAddr(), err.Error()) this.disconnectNotify() return err } diff --git a/p2pserver/message/msg_pack/msg_pack.go b/p2pserver/message/msg_pack/msg_pack.go index 0c0a06a4af..9386c25a68 100644 --- a/p2pserver/message/msg_pack/msg_pack.go +++ b/p2pserver/message/msg_pack/msg_pack.go @@ -32,6 +32,7 @@ import ( //Peer address package func NewAddrs(nodeAddrs []msgCommon.PeerAddr) mt.Message { + log.Debug() var addr mt.Addr addr.NodeAddrs = nodeAddrs @@ -40,6 +41,7 @@ func NewAddrs(nodeAddrs []msgCommon.PeerAddr) mt.Message { //Peer address request package func NewAddrReq() mt.Message { + log.Debug() var msg mt.AddrReq return &msg } @@ -55,6 +57,7 @@ func NewBlock(bk *ct.Block) mt.Message { //blk hdr package func NewHeaders(headers []*ct.Header) mt.Message { + log.Debug() var blkHdr mt.BlkHeader blkHdr.BlkHdr = headers @@ -63,6 +66,7 @@ func NewHeaders(headers []*ct.Header) mt.Message { //blk hdr req package func NewHeadersReq(curHdrHash common.Uint256) mt.Message { + log.Debug() var h mt.HeadersReq h.Len = 1 h.HashEnd = curHdrHash @@ -81,6 +85,7 @@ func NewConsensus(cp *mt.ConsensusPayload) mt.Message { //InvPayload func NewInvPayload(invType common.InventoryType, msg []common.Uint256) *mt.InvPayload { + log.Debug() return &mt.InvPayload{ InvType: invType, Blk: msg, @@ -89,6 +94,7 @@ func NewInvPayload(invType common.InventoryType, msg []common.Uint256) *mt.InvPa //Inv request package func NewInv(invPayload *mt.InvPayload) mt.Message { + log.Debug() var inv mt.Inv inv.P.Blk = invPayload.Blk inv.P.InvType = invPayload.InvType @@ -134,6 +140,7 @@ func NewTxn(txn *ct.Transaction) mt.Message { //version ack package func NewVerAck(isConsensus bool) mt.Message { + log.Debug() var verAck mt.VerACK verAck.IsConsensus = isConsensus @@ -142,6 +149,7 @@ func NewVerAck(isConsensus bool) mt.Message { //Version package func NewVersion(n p2pnet.P2P, isCons bool, height uint32) mt.Message { + log.Debug() var version mt.Version version.P = mt.VersionPayload{ Version: n.GetVersion(), @@ -170,6 +178,7 @@ func NewVersion(n p2pnet.P2P, isCons bool, height uint32) mt.Message { //transaction request package func NewTxnDataReq(hash common.Uint256) mt.Message { + log.Debug() var dataReq mt.DataReq dataReq.DataType = common.TRANSACTION dataReq.Hash = hash @@ -179,6 +188,7 @@ func NewTxnDataReq(hash common.Uint256) mt.Message { //block request package func NewBlkDataReq(hash common.Uint256) mt.Message { + log.Debug() var dataReq mt.DataReq dataReq.DataType = common.BLOCK dataReq.Hash = hash @@ -188,6 +198,7 @@ func NewBlkDataReq(hash common.Uint256) mt.Message { //consensus request package func NewConsensusDataReq(hash common.Uint256) mt.Message { + log.Debug() var dataReq mt.DataReq dataReq.DataType = common.CONSENSUS dataReq.Hash = hash diff --git a/p2pserver/message/utils/msg_handler.go b/p2pserver/message/utils/msg_handler.go index 0b62fd6ba0..9e6e49e395 100644 --- a/p2pserver/message/utils/msg_handler.go +++ b/p2pserver/message/utils/msg_handler.go @@ -45,10 +45,10 @@ var respCache *lru.ARCCache // AddrReqHandle handles the neighbor address request from peer func AddrReqHandle(data *msgTypes.MsgPayload, p2p p2p.P2P, pid *evtActor.PID, args ...interface{}) { - log.Debug("receive addr request message", data.Addr, data.Id) + log.Debug("[p2p]receive addr request message", data.Addr, data.Id) remotePeer := p2p.GetPeer(data.Id) if remotePeer == nil { - log.Error("remotePeer invalid in AddrReqHandle") + log.Debug("[p2p]remotePeer invalid in AddrReqHandle") return } @@ -74,14 +74,14 @@ func AddrReqHandle(data *msgTypes.MsgPayload, p2p p2p.P2P, pid *evtActor.PID, ar msg := msgpack.NewAddrs(addrStr) err := p2p.Send(remotePeer, msg, false) if err != nil { - log.Error(err) + log.Warn(err) return } } // HeaderReqHandle handles the header sync req from peer func HeadersReqHandle(data *msgTypes.MsgPayload, p2p p2p.P2P, pid *evtActor.PID, args ...interface{}) { - log.Debug("receive headers request message", data.Addr, data.Id) + log.Debug("[p2p]receive headers request message", data.Addr, data.Id) headersReq := data.Payload.(*msgTypes.HeadersReq) @@ -90,30 +90,30 @@ func HeadersReqHandle(data *msgTypes.MsgPayload, p2p p2p.P2P, pid *evtActor.PID, headers, err := GetHeadersFromHash(startHash, stopHash) if err != nil { - log.Errorf("get headers in HeadersReqHandle error: %s,startHash:%s,stopHash:%s", err.Error(), startHash.ToHexString(), stopHash.ToHexString()) + log.Warnf("get headers in HeadersReqHandle error: %s,startHash:%s,stopHash:%s", err.Error(), startHash.ToHexString(), stopHash.ToHexString()) return } remotePeer := p2p.GetPeer(data.Id) if remotePeer == nil { - log.Errorf("remotePeer invalid in HeadersReqHandle, peer id: %d", data.Id) + log.Debugf("[p2p]remotePeer invalid in HeadersReqHandle, peer id: %d", data.Id) return } msg := msgpack.NewHeaders(headers) err = p2p.Send(remotePeer, msg, false) if err != nil { - log.Error(err) + log.Warn(err) return } } //PingHandle handle ping msg from peer func PingHandle(data *msgTypes.MsgPayload, p2p p2p.P2P, pid *evtActor.PID, args ...interface{}) { - log.Debug("receive ping message", data.Addr, data.Id) + log.Debug("[p2p]receive ping message", data.Addr, data.Id) ping := data.Payload.(*msgTypes.Ping) remotePeer := p2p.GetPeer(data.Id) if remotePeer == nil { - log.Error("remotePeer invalid in PingHandle") + log.Debug("[p2p]remotePeer invalid in PingHandle") return } remotePeer.SetHeight(ping.Height) @@ -124,19 +124,19 @@ func PingHandle(data *msgTypes.MsgPayload, p2p p2p.P2P, pid *evtActor.PID, args err := p2p.Send(remotePeer, msg, false) if err != nil { - log.Error(err) + log.Warn(err) } } ///PongHandle handle pong msg from peer func PongHandle(data *msgTypes.MsgPayload, p2p p2p.P2P, pid *evtActor.PID, args ...interface{}) { - log.Debug("receive pong message", data.Addr, data.Id) + log.Debug("[p2p]receive pong message", data.Addr, data.Id) pong := data.Payload.(*msgTypes.Pong) remotePeer := p2p.GetPeer(data.Id) if remotePeer == nil { - log.Error("remotePeer invalid in PongHandle") + log.Debug("[p2p]remotePeer invalid in PongHandle") return } remotePeer.SetHeight(pong.Height) @@ -144,7 +144,7 @@ func PongHandle(data *msgTypes.MsgPayload, p2p p2p.P2P, pid *evtActor.PID, args // BlkHeaderHandle handles the sync headers from peer func BlkHeaderHandle(data *msgTypes.MsgPayload, p2p p2p.P2P, pid *evtActor.PID, args ...interface{}) { - log.Debug("receive block header message", data.Addr, data.Id) + log.Debug("[p2p]receive block header message", data.Addr, data.Id) if pid != nil { var blkHeader = data.Payload.(*msgTypes.BlkHeader) input := &msgCommon.AppendHeaders{ @@ -157,7 +157,7 @@ func BlkHeaderHandle(data *msgTypes.MsgPayload, p2p p2p.P2P, pid *evtActor.PID, // BlockHandle handles the block message from peer func BlockHandle(data *msgTypes.MsgPayload, p2p p2p.P2P, pid *evtActor.PID, args ...interface{}) { - log.Debug("receive block message from ", data.Addr, data.Id) + log.Debug("[p2p]receive block message from ", data.Addr, data.Id) if pid != nil { var block = data.Payload.(*msgTypes.Block) @@ -172,12 +172,12 @@ func BlockHandle(data *msgTypes.MsgPayload, p2p p2p.P2P, pid *evtActor.PID, args // ConsensusHandle handles the consensus message from peer func ConsensusHandle(data *msgTypes.MsgPayload, p2p p2p.P2P, pid *evtActor.PID, args ...interface{}) { - log.Debugf("receive consensus message:%v,%d", data.Addr, data.Id) + log.Debugf("[p2p]receive consensus message:%v,%d", data.Addr, data.Id) if actor.ConsensusPid != nil { var consensus = data.Payload.(*msgTypes.Consensus) if err := consensus.Cons.Verify(); err != nil { - log.Error(err) + log.Warn(err) return } consensus.Cons.PeerId = data.Id @@ -188,28 +188,28 @@ func ConsensusHandle(data *msgTypes.MsgPayload, p2p p2p.P2P, pid *evtActor.PID, // NotFoundHandle handles the not found message from peer func NotFoundHandle(data *msgTypes.MsgPayload, p2p p2p.P2P, pid *evtActor.PID, args ...interface{}) { var notFound = data.Payload.(*msgTypes.NotFound) - log.Debug("receive notFound message, hash is ", notFound.Hash) + log.Debug("[p2p]receive notFound message, hash is ", notFound.Hash) } // TransactionHandle handles the transaction message from peer func TransactionHandle(data *msgTypes.MsgPayload, p2p p2p.P2P, pid *evtActor.PID, args ...interface{}) { - log.Debug("receive transaction message", data.Addr, data.Id) + log.Debug("[p2p]receive transaction message", data.Addr, data.Id) var trn = data.Payload.(*msgTypes.Trn) actor.AddTransaction(trn.Txn) - log.Debug("receive Transaction message hash", trn.Txn.Hash()) + log.Debug("[p2p]receive Transaction message hash", trn.Txn.Hash()) } // VersionHandle handles version handshake protocol from peer func VersionHandle(data *msgTypes.MsgPayload, p2p p2p.P2P, pid *evtActor.PID, args ...interface{}) { - log.Debug("receive version message", data.Addr, data.Id) + log.Debug("[p2p]receive version message", data.Addr, data.Id) version := data.Payload.(*msgTypes.Version) remotePeer := p2p.GetPeerFromAddr(data.Addr) if remotePeer == nil { - log.Warn("peer is not exist", data.Addr) + log.Debug("[p2p]peer is not exist", data.Addr) //peer not exist,just remove list and return p2p.RemoveFromConnectingList(data.Addr) return @@ -225,7 +225,7 @@ func VersionHandle(data *msgTypes.MsgPayload, p2p p2p.P2P, pid *evtActor.PID, ar found := false for _, addr := range config.DefConfig.P2PNode.ReservedCfg.ReservedPeers { if strings.HasPrefix(data.Addr, addr) { - log.Info("peer in reserved list") + log.Debug("[p2p]peer in reserved list", data.Addr) found = true break } @@ -233,7 +233,7 @@ func VersionHandle(data *msgTypes.MsgPayload, p2p p2p.P2P, pid *evtActor.PID, ar if !found { remotePeer.CloseSync() remotePeer.CloseCons() - log.Info("peer not in reserved list,close") + log.Debug("[p2p]peer not in reserved list,close", data.Addr) return } @@ -241,7 +241,7 @@ func VersionHandle(data *msgTypes.MsgPayload, p2p p2p.P2P, pid *evtActor.PID, ar if version.P.IsConsensus == true { if config.DefConfig.P2PNode.DualPortSupport == false { - log.Warn("consensus port not surpport") + log.Warn("[p2p]consensus port not surpport", data.Addr) remotePeer.CloseCons() return } @@ -249,7 +249,7 @@ func VersionHandle(data *msgTypes.MsgPayload, p2p p2p.P2P, pid *evtActor.PID, ar p := p2p.GetPeer(version.P.Nonce) if p == nil { - log.Warn("sync link is not exist", version.P.Nonce) + log.Warn("[p2p]sync link is not exist", version.P.Nonce, data.Addr) remotePeer.CloseCons() remotePeer.CloseSync() return @@ -262,7 +262,7 @@ func VersionHandle(data *msgTypes.MsgPayload, p2p p2p.P2P, pid *evtActor.PID, ar } if version.P.Nonce == p2p.GetID() { - log.Warn("the node handshake with itself") + log.Warn("[p2p]the node handshake with itself", data.Addr) p2p.SetOwnAddress(nodeAddr) p2p.RemoveFromInConnRecord(remotePeer.GetAddr()) p2p.RemoveFromOutConnRecord(remotePeer.GetAddr()) @@ -272,7 +272,7 @@ func VersionHandle(data *msgTypes.MsgPayload, p2p p2p.P2P, pid *evtActor.PID, ar s := remotePeer.GetConsState() if s != msgCommon.INIT && s != msgCommon.HAND { - log.Warn("unknown status to received version", s) + log.Warnf("[p2p]unknown status to received version,%d,%s\n", s, data.Addr) remotePeer.CloseCons() return } @@ -294,14 +294,14 @@ func VersionHandle(data *msgTypes.MsgPayload, p2p p2p.P2P, pid *evtActor.PID, ar } err := p2p.Send(remotePeer, msg, true) if err != nil { - log.Error(err) + log.Warn(err) return } } else { if version.P.Nonce == p2p.GetID() { p2p.RemoveFromInConnRecord(remotePeer.GetAddr()) p2p.RemoveFromOutConnRecord(remotePeer.GetAddr()) - log.Warn("the node handshake with itself") + log.Warn("[p2p]the node handshake with itself", remotePeer.GetAddr()) p2p.SetOwnAddress(nodeAddr) remotePeer.CloseSync() return @@ -309,7 +309,7 @@ func VersionHandle(data *msgTypes.MsgPayload, p2p p2p.P2P, pid *evtActor.PID, ar s := remotePeer.GetSyncState() if s != msgCommon.INIT && s != msgCommon.HAND { - log.Warn("unknown status to received version", s) + log.Warnf("[p2p]unknown status to received version,%d,%s\n", s, remotePeer.GetAddr()) remotePeer.CloseSync() return } @@ -319,20 +319,20 @@ func VersionHandle(data *msgTypes.MsgPayload, p2p p2p.P2P, pid *evtActor.PID, ar if p != nil { ipOld, err := msgCommon.ParseIPAddr(p.GetAddr()) if err != nil { - log.Warn("exist peer %d ip format is wrong %s", version.P.Nonce, p.GetAddr()) + log.Warn("[p2p]exist peer %d ip format is wrong %s", version.P.Nonce, p.GetAddr()) return } ipNew, err := msgCommon.ParseIPAddr(data.Addr) if err != nil { remotePeer.CloseSync() - log.Warn("connecting peer %d ip format is wrong %s, close", version.P.Nonce, data.Addr) + log.Warn("[p2p]connecting peer %d ip format is wrong %s, close", version.P.Nonce, data.Addr) return } if ipNew == ipOld { //same id and same ip n, ret := p2p.DelNbrNode(version.P.Nonce) if ret == true { - log.Infof("peer reconnect %d", version.P.Nonce) + log.Infof("[p2p]peer reconnect %d", version.P.Nonce, data.Addr) // Close the connection and release the node source n.CloseSync() n.CloseCons() @@ -344,7 +344,7 @@ func VersionHandle(data *msgTypes.MsgPayload, p2p p2p.P2P, pid *evtActor.PID, ar } } } else { - log.Infof("same peer id from different addr: %s, %s close latest one", ipOld, ipNew) + log.Warnf("[p2p]same peer id from different addr: %s, %s close latest one", ipOld, ipNew) remotePeer.CloseSync() return @@ -382,7 +382,7 @@ func VersionHandle(data *msgTypes.MsgPayload, p2p p2p.P2P, pid *evtActor.PID, ar } err := p2p.Send(remotePeer, msg, false) if err != nil { - log.Error(err) + log.Warn(err) return } } @@ -390,24 +390,24 @@ func VersionHandle(data *msgTypes.MsgPayload, p2p p2p.P2P, pid *evtActor.PID, ar // VerAckHandle handles the version ack from peer func VerAckHandle(data *msgTypes.MsgPayload, p2p p2p.P2P, pid *evtActor.PID, args ...interface{}) { - log.Debug("receive verAck message from ", data.Addr, data.Id) + log.Debug("[p2p]receive verAck message from ", data.Addr, data.Id) verAck := data.Payload.(*msgTypes.VerACK) remotePeer := p2p.GetPeer(data.Id) if remotePeer == nil { - log.Warn("nbr node is not exist", data.Id, data.Addr) + log.Warn("[p2p]nbr node is not exist", data.Id, data.Addr) return } if verAck.IsConsensus == true { if config.DefConfig.P2PNode.DualPortSupport == false { - log.Warn("consensus port not surpport") + log.Warn("[p2p]consensus port not surpport") return } s := remotePeer.GetConsState() if s != msgCommon.HAND_SHAKE && s != msgCommon.HAND_SHAKED { - log.Warn("unknown status to received verAck", s) + log.Warnf("[p2p]unknown status to received verAck,state:%d,%s\n", s, data.Addr) return } @@ -422,7 +422,7 @@ func VerAckHandle(data *msgTypes.MsgPayload, p2p p2p.P2P, pid *evtActor.PID, arg } else { s := remotePeer.GetSyncState() if s != msgCommon.HAND_SHAKE && s != msgCommon.HAND_SHAKED { - log.Warn("unknown status to received verAck", s) + log.Warnf("[p2p]unknown status to received verAck,state:%d,%s\n", s, data.Addr) return } @@ -457,7 +457,7 @@ func VerAckHandle(data *msgTypes.MsgPayload, p2p p2p.P2P, pid *evtActor.PID, arg // AddrHandle handles the neighbor address response message from peer func AddrHandle(data *msgTypes.MsgPayload, p2p p2p.P2P, pid *evtActor.PID, args ...interface{}) { - log.Debug("handle addr message", data.Addr, data.Id) + log.Debug("[p2p]handle addr message", data.Addr, data.Id) var msg = data.Payload.(*msgTypes.Addr) for _, v := range msg.NodeAddrs { @@ -483,20 +483,20 @@ func AddrHandle(data *msgTypes.MsgPayload, p2p p2p.P2P, pid *evtActor.PID, args if p2p.IsAddrFromConnecting(address) { continue } - log.Debug("connect ip address:", address) + log.Debug("[p2p]connect ip address:", address) go p2p.Connect(address, false) } } // DataReqHandle handles the data req(block/Transaction) from peer func DataReqHandle(data *msgTypes.MsgPayload, p2p p2p.P2P, pid *evtActor.PID, args ...interface{}) { - log.Debug("receive data req message", data.Addr, data.Id) + log.Debug("[p2p]receive data req message", data.Addr, data.Id) var dataReq = data.Payload.(*msgTypes.DataReq) remotePeer := p2p.GetPeer(data.Id) if remotePeer == nil { - log.Error("remotePeer invalid in DataReqHandle") + log.Debug("[p2p]remotePeer invalid in DataReqHandle") return } reqType := common.InventoryType(dataReq.DataType) @@ -516,43 +516,43 @@ func DataReqHandle(data *msgTypes.MsgPayload, p2p p2p.P2P, pid *evtActor.PID, ar if block == nil { block, err = ledger.DefLedger.GetBlockByHash(hash) if err != nil || block == nil || block.Header == nil { - log.Debug("can't get block by hash: ", hash, + log.Debug("[p2p]can't get block by hash: ", hash, " ,send not found message") msg := msgpack.NewNotFound(hash) err := p2p.Send(remotePeer, msg, false) if err != nil { - log.Error(err) + log.Warn(err) return } return } saveRespCache(reqID, block) } - log.Debug("block height is ", block.Header.Height, + log.Debug("[p2p]block height is ", block.Header.Height, " ,hash is ", hash) msg := msgpack.NewBlock(block) err = p2p.Send(remotePeer, msg, false) if err != nil { - log.Error(err) + log.Warn(err) return } case common.TRANSACTION: txn, err := ledger.DefLedger.GetTransaction(hash) if err != nil { - log.Debug("Can't get transaction by hash: ", + log.Debug("[p2p]Can't get transaction by hash: ", hash, " ,send not found message") msg := msgpack.NewNotFound(hash) err = p2p.Send(remotePeer, msg, false) if err != nil { - log.Error(err) + log.Warn(err) return } } msg := msgpack.NewTxn(txn) err = p2p.Send(remotePeer, msg, false) if err != nil { - log.Error(err) + log.Warn(err) return } } @@ -561,27 +561,27 @@ func DataReqHandle(data *msgTypes.MsgPayload, p2p p2p.P2P, pid *evtActor.PID, ar // InvHandle handles the inventory message(block, // transaction and consensus) from peer. func InvHandle(data *msgTypes.MsgPayload, p2p p2p.P2P, pid *evtActor.PID, args ...interface{}) { - log.Debug("receive inv message", data.Addr, data.Id) + log.Debug("[p2p]receive inv message", data.Addr, data.Id) var inv = data.Payload.(*msgTypes.Inv) remotePeer := p2p.GetPeer(data.Id) if remotePeer == nil { - log.Error("remotePeer invalid in InvHandle") + log.Debug("[p2p]remotePeer invalid in InvHandle") return } if len(inv.P.Blk) == 0 { - log.Error("empty inv payload in InvHandle") + log.Debug("[p2p]empty inv payload in InvHandle") return } var id common.Uint256 str := inv.P.Blk[0].ToHexString() - log.Debugf("the inv type: 0x%x block len: %d, %s\n", + log.Debugf("[p2p]the inv type: 0x%x block len: %d, %s\n", inv.P.InvType, len(inv.P.Blk), str) invType := common.InventoryType(inv.P.InvType) switch invType { case common.TRANSACTION: - log.Debug("receive transaction message", id) + log.Debug("[p2p]receive transaction message", id) // TODO check the ID queue id = inv.P.Blk[0] trn, err := ledger.DefLedger.GetTransaction(id) @@ -589,53 +589,55 @@ func InvHandle(data *msgTypes.MsgPayload, p2p p2p.P2P, pid *evtActor.PID, args . msg := msgpack.NewTxnDataReq(id) err = p2p.Send(remotePeer, msg, false) if err != nil { - log.Error(err) + log.Warn(err) return } } case common.BLOCK: - log.Debug("receive block message") + log.Debug("[p2p]receive block message") for _, id = range inv.P.Blk { - log.Debug("receive inv-block message, hash is ", id) + log.Debug("[p2p]receive inv-block message, hash is ", id) // TODO check the ID queue isContainBlock, err := ledger.DefLedger.IsContainBlock(id) if err != nil { - log.Error(err) + log.Warn(err) return } if !isContainBlock && msgTypes.LastInvHash != id { msgTypes.LastInvHash = id // send the block request - log.Infof("inv request block hash: %x", id) + log.Infof("[p2p]inv request block hash: %x", id) msg := msgpack.NewBlkDataReq(id) err = p2p.Send(remotePeer, msg, false) if err != nil { - log.Error(err) + log.Warn(err) return } } } case common.CONSENSUS: - log.Debug("receive consensus message") + log.Debug("[p2p]receive consensus message") id = inv.P.Blk[0] msg := msgpack.NewConsensusDataReq(id) err := p2p.Send(remotePeer, msg, true) if err != nil { - log.Error(err) + log.Warn(err) return } default: - log.Warn("receive unknown inventory message") + log.Warn("[p2p]receive unknown inventory message") } } // DisconnectHandle handles the disconnect events func DisconnectHandle(data *msgTypes.MsgPayload, p2p p2p.P2P, pid *evtActor.PID, args ...interface{}) { + log.Debug("[p2p]receive disconnect message", data.Addr, data.Id) p2p.RemoveFromInConnRecord(data.Addr) p2p.RemoveFromOutConnRecord(data.Addr) remotePeer := p2p.GetPeer(data.Id) if remotePeer == nil { + log.Debug("[p2p]disconnect peer is nil") return } p2p.RemoveFromConnectingList(data.Addr) @@ -692,7 +694,7 @@ func GetHeadersFromHash(startHash common.Uint256, stopHash common.Uint256) ([]*t // avoid unsigned integer underflow if startHeight < stopHeight { - return nil, errors.New("do not have header to send") + return nil, errors.New("[p2p]do not have header to send") } count = startHeight - stopHeight @@ -715,7 +717,7 @@ func GetHeadersFromHash(startHash common.Uint256, stopHash common.Uint256) ([]*t hash := ledger.DefLedger.GetBlockHash(stopHeight + i) hd, err := ledger.DefLedger.GetHeaderByHash(hash) if err != nil { - log.Errorf("net_server GetBlockWithHeight failed with err=%s, hash=%x,height=%d\n", err.Error(), hash, stopHeight+i) + log.Debugf("[p2p]net_server GetBlockWithHeight failed with err=%s, hash=%x,height=%d\n", err.Error(), hash, stopHeight+i) return nil, err } headers = append(headers, hd) diff --git a/p2pserver/message/utils/msg_router.go b/p2pserver/message/utils/msg_router.go index 4455328281..d52a51868b 100644 --- a/p2pserver/message/utils/msg_router.go +++ b/p2pserver/message/utils/msg_router.go @@ -96,7 +96,7 @@ func (this *MessageRouter) SetPID(pid *actor.PID) { func (this *MessageRouter) Start() { go this.hookChan(this.RecvSyncChan, this.stopSyncCh) go this.hookChan(this.RecvConsChan, this.stopConsCh) - log.Info("MessageRouter start to parse p2p message...") + log.Debug("[p2p]MessageRouter start to parse p2p message...") } // hookChan loops to handle the message from the network @@ -112,7 +112,7 @@ func (this *MessageRouter) hookChan(channel chan *types.MsgPayload, if ok { go handler(data, this.p2p, this.pid) } else { - log.Info("unknown message handler for the msg: ", + log.Warn("unknown message handler for the msg: ", msgType) } } diff --git a/p2pserver/net/netserver/net_utils.go b/p2pserver/net/netserver/net_utils.go index f45172a101..c17b87b094 100644 --- a/p2pserver/net/netserver/net_utils.go +++ b/p2pserver/net/netserver/net_utils.go @@ -41,14 +41,14 @@ func createListener(port uint16) (net.Listener, error) { if isTls { listener, err = initTlsListen(port) if err != nil { - log.Error("initTlslisten failed") - return nil, errors.New("initTlslisten failed") + log.Error("[p2p]initTlslisten failed") + return nil, errors.New("[p2p]initTlslisten failed") } } else { listener, err = initNonTlsListen(port) if err != nil { - log.Error("initNonTlsListen failed") - return nil, errors.New("initNonTlsListen failed") + log.Error("[p2p]initNonTlsListen failed") + return nil, errors.New("[p2p]initNonTlsListen failed") } } return listener, nil @@ -74,7 +74,7 @@ func TLSDial(nodeAddr string) (net.Conn, error) { cacert, err := ioutil.ReadFile(CAPath) if err != nil { - log.Error("load CA file fail", err) + log.Error("[p2p]load CA file fail", err) return nil, err } cert, err := tls.LoadX509KeyPair(CertPath, KeyPath) @@ -84,7 +84,7 @@ func TLSDial(nodeAddr string) (net.Conn, error) { ret := clientCertPool.AppendCertsFromPEM(cacert) if !ret { - return nil, errors.New("failed to parse root certificate") + return nil, errors.New("[p2p]failed to parse root certificate") } conf := &tls.Config{ @@ -106,7 +106,7 @@ func initNonTlsListen(port uint16) (net.Listener, error) { log.Debug() listener, err := net.Listen("tcp", ":"+strconv.Itoa(int(port))) if err != nil { - log.Error("Error listening\n", err.Error()) + log.Error("[p2p]Error listening\n", err.Error()) return nil, err } return listener, nil @@ -121,19 +121,19 @@ func initTlsListen(port uint16) (net.Listener, error) { // load cert cert, err := tls.LoadX509KeyPair(CertPath, KeyPath) if err != nil { - log.Error("load keys fail", err) + log.Error("[p2p]load keys fail", err) return nil, err } // load root ca caData, err := ioutil.ReadFile(CAPath) if err != nil { - log.Error("read ca fail", err) + log.Error("[p2p]read ca fail", err) return nil, err } pool := x509.NewCertPool() ret := pool.AppendCertsFromPEM(caData) if !ret { - return nil, errors.New("failed to parse root certificate") + return nil, errors.New("[p2p]failed to parse root certificate") } tlsConfig := &tls.Config{ @@ -143,7 +143,7 @@ func initTlsListen(port uint16) (net.Listener, error) { ClientCAs: pool, } - log.Info("TLS listen port is ", strconv.Itoa(int(port))) + log.Info("[p2p]TLS listen port is ", strconv.Itoa(int(port))) listener, err := tls.Listen("tcp", ":"+strconv.Itoa(int(port)), tlsConfig) if err != nil { log.Error(err) diff --git a/p2pserver/net/netserver/netserver.go b/p2pserver/net/netserver/netserver.go index 0dc9f09b2f..5c5f5c9e87 100644 --- a/p2pserver/net/netserver/netserver.go +++ b/p2pserver/net/netserver/netserver.go @@ -20,7 +20,6 @@ package netserver import ( "errors" - "fmt" "math/rand" "net" "strings" @@ -103,16 +102,16 @@ func (this *NetServer) init() error { } if config.DefConfig.P2PNode.NodePort == 0 { - log.Error("link port invalid") - return errors.New("invalid link port") + log.Error("[p2p]link port invalid") + return errors.New("[p2p]invalid link port") } this.base.SetSyncPort(uint16(config.DefConfig.P2PNode.NodePort)) if config.DefConfig.P2PNode.DualPortSupport { if config.DefConfig.P2PNode.NodeConsensusPort == 0 { - log.Error("consensus port invalid") - return errors.New("invalid consensus port") + log.Error("[p2p]consensus port invalid") + return errors.New("[p2p]invalid consensus port") } this.base.SetConsPort(uint16(config.DefConfig.P2PNode.NodeConsensusPort)) @@ -127,7 +126,7 @@ func (this *NetServer) init() error { this.base.SetID(id) - log.Infof("init peer ID to %d", this.base.GetID()) + log.Infof("[p2p]init peer ID to %d", this.base.GetID()) this.Np = &peer.NbrPeers{} this.Np.Init() @@ -252,8 +251,8 @@ func (this *NetServer) Send(p *peer.Peer, msg types.Message, isConsensus bool) e } return p.Send(msg, isConsensus) } - log.Error("send to a invalid peer") - return errors.New("send to a invalid peer") + log.Warn("[p2p]send to a invalid peer") + return errors.New("[p2p]send to a invalid peer") } //IsPeerEstablished return the establise state of given peer`s id @@ -267,7 +266,7 @@ func (this *NetServer) IsPeerEstablished(p *peer.Peer) bool { //Connect used to connect net address under sync or cons mode func (this *NetServer) Connect(addr string, isConsensus bool) error { if this.IsAddrInOutConnRecord(addr) { - log.Error("Addr is in OutConnectionRecord") + log.Debugf("[p2p]Address: %s Consensus: %v is in OutConnectionRecord,", addr, isConsensus) return nil } if this.IsOwnAddress(addr) { @@ -280,10 +279,10 @@ func (this *NetServer) Connect(addr string, isConsensus bool) error { this.connectLock.Lock() connCount := uint(this.GetOutConnRecordLen()) if connCount >= config.DefConfig.P2PNode.MaxConnOutBound { - log.Warnf("Connect: out connections(%d) reach the max limit(%d)", connCount, + log.Warnf("[p2p]Connect: out connections(%d) reach the max limit(%d)", connCount, config.DefConfig.P2PNode.MaxConnOutBound) this.connectLock.Unlock() - return errors.New("connect: out connections reach the max limit") + return errors.New("[p2p]connect: out connections reach the max limit") } this.connectLock.Unlock() @@ -292,15 +291,7 @@ func (this *NetServer) Connect(addr string, isConsensus bool) error { } this.connectLock.Lock() if added := this.AddOutConnectingList(addr); added == false { - p := this.GetPeerFromAddr(addr) - if p != nil { - if p.SyncLink.Valid() { - log.Info("node exist in connecting list", addr) - this.connectLock.Unlock() - return errors.New("node exist in connecting list") - } - } - this.RemoveFromConnectingList(addr) + log.Debug("[p2p]node exist in connecting list", addr) } this.connectLock.Unlock() @@ -312,22 +303,22 @@ func (this *NetServer) Connect(addr string, isConsensus bool) error { conn, err = TLSDial(addr) if err != nil { this.RemoveFromConnectingList(addr) - log.Debug("connect failed: ", err) + log.Debugf("[p2p]connect %s failed:%s \n", addr, err.Error()) return err } } else { conn, err = nonTLSDial(addr) if err != nil { this.RemoveFromConnectingList(addr) - log.Debug("connect failed: ", err) + log.Debugf("[p2p]connect %s failed:%s \n", addr, err.Error()) return err } } addr = conn.RemoteAddr().String() - log.Info(fmt.Sprintf("peer %s connect with %s with %s", + log.Debugf("[p2p]peer %s connect with %s with %s", conn.LocalAddr().String(), conn.RemoteAddr().String(), - conn.RemoteAddr().Network())) + conn.RemoteAddr().Network()) if !isConsensus { this.AddOutConnRecord(addr) @@ -354,7 +345,7 @@ func (this *NetServer) Connect(addr string, isConsensus bool) error { if !isConsensus { this.RemoveFromOutConnRecord(addr) } - log.Error(err) + log.Warn(err) return err } return nil @@ -383,24 +374,24 @@ func (this *NetServer) startListening() error { consPort := this.base.GetConsPort() if syncPort == 0 { - log.Error("sync port invalid") - return errors.New("sync port invalid") + log.Error("[p2p]sync port invalid") + return errors.New("[p2p]sync port invalid") } err = this.startSyncListening(syncPort) if err != nil { - log.Error("start sync listening fail") + log.Error("[p2p]start sync listening fail") return err } //consensus if config.DefConfig.P2PNode.DualPortSupport == false { - log.Info("dual port mode not supported,keep single link") + log.Debug("[p2p]dual port mode not supported,keep single link") return nil } if consPort == 0 || consPort == syncPort { //still work - log.Error("consensus port invalid,keep single link") + log.Warn("[p2p]consensus port invalid,keep single link") } else { err = this.startConsListening(consPort) if err != nil { @@ -415,12 +406,12 @@ func (this *NetServer) startSyncListening(port uint16) error { var err error this.synclistener, err = createListener(port) if err != nil { - log.Error("failed to create sync listener") - return errors.New("failed to create sync listener") + log.Error("[p2p]failed to create sync listener") + return errors.New("[p2p]failed to create sync listener") } go this.startSyncAccept(this.synclistener) - log.Infof("start listen on sync port %d", port) + log.Infof("[p2p]start listen on sync port %d", port) return nil } @@ -429,12 +420,12 @@ func (this *NetServer) startConsListening(port uint16) error { var err error this.conslistener, err = createListener(port) if err != nil { - log.Error("failed to create cons listener") - return errors.New("failed to create cons listener") + log.Error("[p2p]failed to create cons listener") + return errors.New("[p2p]failed to create cons listener") } go this.startConsAccept(this.conslistener) - log.Infof("Start listen on consensus port %d", port) + log.Infof("[p2p]Start listen on consensus port %d", port) return nil } @@ -442,17 +433,19 @@ func (this *NetServer) startConsListening(port uint16) error { func (this *NetServer) startSyncAccept(listener net.Listener) { for { conn, err := listener.Accept() + if err != nil { - log.Error("error accepting ", err.Error()) + log.Error("[p2p]error accepting ", err.Error()) return } + + log.Debug("[p2p]remote sync node connect with ", + conn.RemoteAddr(), conn.LocalAddr()) if !this.AddrValid(conn.RemoteAddr().String()) { - log.Warnf("remote %s not in reserved list, close it ", conn.RemoteAddr()) + log.Warnf("[p2p]remote %s not in reserved list, close it ", conn.RemoteAddr()) conn.Close() continue } - log.Info("remote sync node connect with ", - conn.RemoteAddr(), conn.LocalAddr()) if this.IsAddrInInConnRecord(conn.RemoteAddr().String()) { conn.Close() @@ -461,7 +454,7 @@ func (this *NetServer) startSyncAccept(listener net.Listener) { syncAddrCount := uint(this.GetInConnRecordLen()) if syncAddrCount >= config.DefConfig.P2PNode.MaxConnInBound { - log.Warnf("SyncAccept: total connections(%d) reach the max limit(%d), conn closed", + log.Warnf("[p2p]SyncAccept: total connections(%d) reach the max limit(%d), conn closed", syncAddrCount, config.DefConfig.P2PNode.MaxConnInBound) conn.Close() continue @@ -469,13 +462,13 @@ func (this *NetServer) startSyncAccept(listener net.Listener) { remoteIp, err := common.ParseIPAddr(conn.RemoteAddr().String()) if err != nil { - log.Error("parse ip error ", err.Error()) + log.Warn("[p2p]parse ip error ", err.Error()) conn.Close() continue } connNum := this.GetIpCountInInConnRecord(remoteIp) if connNum >= config.DefConfig.P2PNode.MaxConnInBoundForSingleIP { - log.Warnf("SyncAccept: connections(%d) with ip(%s) has reach the max limit(%d), "+ + log.Warnf("[p2p]SyncAccept: connections(%d) with ip(%s) has reach the max limit(%d), "+ "conn closed", connNum, remoteIp, config.DefConfig.P2PNode.MaxConnInBoundForSingleIP) conn.Close() continue @@ -499,20 +492,20 @@ func (this *NetServer) startConsAccept(listener net.Listener) { for { conn, err := listener.Accept() if err != nil { - log.Error("error accepting ", err.Error()) + log.Error("[p2p]error accepting ", err.Error()) return } + log.Debug("[p2p]remote cons node connect with ", + conn.RemoteAddr(), conn.LocalAddr()) if !this.AddrValid(conn.RemoteAddr().String()) { - log.Warnf("remote %s not in reserved list, close it ", conn.RemoteAddr()) + log.Warnf("[p2p]remote %s not in reserved list, close it ", conn.RemoteAddr()) conn.Close() continue } - log.Info("remote cons node connect with ", - conn.RemoteAddr(), conn.LocalAddr()) remoteIp, err := common.ParseIPAddr(conn.RemoteAddr().String()) if err != nil { - log.Error("parse ip error ", err.Error()) + log.Warn("[p2p]parse ip error ", err.Error()) conn.Close() continue } @@ -541,6 +534,7 @@ func (this *NetServer) AddOutConnectingList(addr string) (added bool) { return false } } + log.Debug("[p2p]add to out connecting list", addr) this.ConnectingAddrs = append(this.ConnectingAddrs, addr) return true } @@ -555,6 +549,7 @@ func (this *NetServer) RemoveFromConnectingList(addr string) { addrs = append(addrs, a) } } + log.Debug("[p2p]remove from out connecting list", addr) this.ConnectingAddrs = addrs } @@ -619,6 +614,7 @@ func (this *NetServer) IsNbrPeerAddr(addr string, isConsensus bool) bool { func (this *NetServer) AddPeerSyncAddress(addr string, p *peer.Peer) { this.PeerAddrMap.Lock() defer this.PeerAddrMap.Unlock() + log.Debugf("[p2p]AddPeerSyncAddress %s\n", addr) this.PeerSyncAddress[addr] = p } @@ -626,6 +622,7 @@ func (this *NetServer) AddPeerSyncAddress(addr string, p *peer.Peer) { func (this *NetServer) AddPeerConsAddress(addr string, p *peer.Peer) { this.PeerAddrMap.Lock() defer this.PeerAddrMap.Unlock() + log.Debugf("[p2p]AddPeerConsAddress %s\n", addr) this.PeerConsAddress[addr] = p } @@ -635,6 +632,7 @@ func (this *NetServer) RemovePeerSyncAddress(addr string) { defer this.PeerAddrMap.Unlock() if _, ok := this.PeerSyncAddress[addr]; ok { delete(this.PeerSyncAddress, addr) + log.Debugf("[p2p]delete Sync Address %s\n", addr) } } @@ -644,6 +642,7 @@ func (this *NetServer) RemovePeerConsAddress(addr string) { defer this.PeerAddrMap.Unlock() if _, ok := this.PeerConsAddress[addr]; ok { delete(this.PeerConsAddress, addr) + log.Debugf("[p2p]delete Cons Address %s\n", addr) } } @@ -664,6 +663,7 @@ func (this *NetServer) AddInConnRecord(addr string) { } } this.inConnRecord.InConnectingAddrs = append(this.inConnRecord.InConnectingAddrs, addr) + log.Debugf("[p2p]add in record %s\n", addr) } //IsAddrInInConnRecord return result whether addr is in inConnRecordList @@ -702,6 +702,7 @@ func (this *NetServer) RemoveFromInConnRecord(addr string) { addrs = append(addrs, a) } } + log.Debugf("[p2p]remove in record %s\n", addr) this.inConnRecord.InConnectingAddrs = addrs } @@ -737,6 +738,7 @@ func (this *NetServer) AddOutConnRecord(addr string) { } } this.outConnRecord.OutConnectingAddrs = append(this.outConnRecord.OutConnectingAddrs, addr) + log.Debugf("[p2p]add out record %s\n", addr) } //IsAddrInOutConnRecord return result whether addr is in outConnRecord @@ -761,6 +763,7 @@ func (this *NetServer) RemoveFromOutConnRecord(addr string) { addrs = append(addrs, a) } } + log.Debugf("[p2p]remove out record %s\n", addr) this.outConnRecord.OutConnectingAddrs = addrs } @@ -776,7 +779,7 @@ func (this *NetServer) AddrValid(addr string) bool { if config.DefConfig.P2PNode.ReservedPeersOnly && len(config.DefConfig.P2PNode.ReservedCfg.ReservedPeers) > 0 { for _, ip := range config.DefConfig.P2PNode.ReservedCfg.ReservedPeers { if strings.HasPrefix(addr, ip) { - log.Info("found reserved peer :", addr) + log.Info("[p2p]found reserved peer :", addr) return true } } @@ -796,7 +799,7 @@ func (this *NetServer) IsOwnAddress(addr string) bool { //Set own network address func (this *NetServer) SetOwnAddress(addr string) { if addr != this.OwnAddress { - log.Infof("set own address %s", addr) + log.Infof("[p2p]set own address %s", addr) this.OwnAddress = addr } diff --git a/p2pserver/net/netserver/netserver_test.go b/p2pserver/net/netserver/netserver_test.go index f049cfa192..fc612fecac 100644 --- a/p2pserver/net/netserver/netserver_test.go +++ b/p2pserver/net/netserver/netserver_test.go @@ -19,27 +19,18 @@ package netserver import ( - "bytes" - "encoding/binary" "fmt" "testing" "time" - "github.com/ontio/ontology-crypto/keypair" - "github.com/ontio/ontology/account" "github.com/ontio/ontology/common/log" "github.com/ontio/ontology/p2pserver/common" "github.com/ontio/ontology/p2pserver/peer" ) -var key keypair.PublicKey - func init() { log.Init(log.Stdout) fmt.Println("Start test the netserver...") - acct := account.NewAccount("SHA256withECDSA") - key = acct.PubKey() - } func creatPeers(cnt uint16) []*peer.Peer { @@ -65,7 +56,7 @@ func creatPeers(cnt uint16) []*peer.Peer { } func TestNewNetServer(t *testing.T) { - server := NewNetServer(key) + server := NewNetServer() server.Start() defer server.Halt() @@ -73,20 +64,11 @@ func TestNewNetServer(t *testing.T) { if server.GetHeight() != 1000 { t.Error("TestNewNetServer set server height error") } - var id uint64 - k := keypair.SerializePublicKey(key) - err := binary.Read(bytes.NewBuffer(k[:8]), binary.LittleEndian, &(id)) - if err != nil { - t.Error(err) - } - if server.GetID() != id { - t.Error("TestNewNetServer server id error") - } + if server.GetRelay() != true { t.Error("TestNewNetServer server relay state error", server.GetRelay()) } - //not exsit in default config - if server.GetServices() != 0 { + if server.GetServices() != 1 { t.Error("TestNewNetServer server service state error", server.GetServices()) } if server.GetVersion() != common.PROTOCOL_VERSION { @@ -98,16 +80,14 @@ func TestNewNetServer(t *testing.T) { if server.GetConsPort() != 20339 { t.Error("TestNewNetServer sync port error", server.GetConsPort()) } - if server.GetPubKey() != key { - t.Error("TestNewNetServer public key error") - } + fmt.Printf("lastest server time is %s\n", time.Unix(server.GetTime()/1e9, 0).String()) } func TestNetServerNbrPeer(t *testing.T) { log.Init(log.Stdout) - server := NewNetServer(key) + server := NewNetServer() server.Start() defer server.Halt() @@ -120,7 +100,7 @@ func TestNetServerNbrPeer(t *testing.T) { if server.GetConnectionCnt() != 5 { t.Error("TestNetServerNbrPeer GetConnectionCnt error", server.GetConnectionCnt()) } - addrs, _ := server.GetNeighborAddrs() + addrs := server.GetNeighborAddrs() if len(addrs) != 5 { t.Error("TestNetServerNbrPeer GetNeighborAddrs error") } diff --git a/p2pserver/p2pserver.go b/p2pserver/p2pserver.go index 788dd64431..e7406853ae 100644 --- a/p2pserver/p2pserver.go +++ b/p2pserver/p2pserver.go @@ -94,12 +94,12 @@ func (this *P2PServer) Start() error { if this.network != nil { this.network.Start() } else { - return errors.New("p2p network invalid") + return errors.New("[p2p]network invalid") } if this.msgRouter != nil { this.msgRouter.Start() } else { - return errors.New("p2p msg router invalid") + return errors.New("[p2p]msg router invalid") } this.tryRecentPeers() go this.connectSeedService() @@ -147,28 +147,28 @@ func (this *P2PServer) Xmit(message interface{}) error { isConsensus := false switch message.(type) { case *types.Transaction: - log.Debug("TX transaction message") + log.Debug("[p2p]TX transaction message") txn := message.(*types.Transaction) msg = msgpack.NewTxn(txn) case *types.Block: - log.Debug("TX block message") + log.Debug("[p2p]TX block message") block := message.(*types.Block) msg = msgpack.NewBlock(block) case *msgtypes.ConsensusPayload: - log.Debug("TX consensus message") + log.Debug("[p2p]TX consensus message") consensusPayload := message.(*msgtypes.ConsensusPayload) msg = msgpack.NewConsensus(consensusPayload) isConsensus = true case comm.Uint256: - log.Debug("TX block hash message") + log.Debug("[p2p]TX block hash message") hash := message.(comm.Uint256) // construct inv message invPayload := msgpack.NewInvPayload(comm.BLOCK, []comm.Uint256{hash}) msg = msgpack.NewInv(invPayload) default: - log.Warnf("Unknown Xmit message %v , type %v", message, + log.Warnf("[p2p]Unknown Xmit message %v , type %v", message, reflect.TypeOf(message)) - return errors.New("Unknown Xmit message type") + return errors.New("[p2p]Unknown Xmit message type") } this.network.Xmit(msg, isConsensus) return nil @@ -180,9 +180,9 @@ func (this *P2PServer) Send(p *peer.Peer, msg msgtypes.Message, if this.network.IsPeerEstablished(p) { return this.network.Send(p, msg, isConsensus) } - log.Errorf("P2PServer send to a not ESTABLISH peer %d", + log.Warnf("[p2p]send to a not ESTABLISH peer %d", p.GetID()) - return errors.New("send to a not ESTABLISH peer") + return errors.New("[p2p]send to a not ESTABLISH peer") } // GetID returns local node id @@ -258,7 +258,7 @@ func (this *P2PServer) WaitForSyncBlkFinish() { for { headerHeight := this.ledger.GetCurrentHeaderHeight() currentBlkHeight := this.ledger.GetCurrentBlockHeight() - log.Info("WaitForSyncBlkFinish... current block height is ", + log.Info("[p2p]WaitForSyncBlkFinish... current block height is ", currentBlkHeight, " ,current header height is ", headerHeight) if this.blockSyncFinished() { @@ -273,7 +273,7 @@ func (this *P2PServer) WaitForSyncBlkFinish() { func (this *P2PServer) WaitForPeersStart() { periodTime := config.DEFAULT_GEN_BLOCK_TIME / common.UPDATE_RATE_PER_BLOCK for { - log.Info("Wait for minimum connection...") + log.Info("[p2p]Wait for minimum connection...") if this.reachMinConnection() { break } @@ -289,17 +289,17 @@ func (this *P2PServer) connectSeeds() { for _, n := range config.DefConfig.Genesis.SeedList { ip, err := common.ParseIPAddr(n) if err != nil { - log.Warnf("seed peer %s address format is wrong", n) + log.Warnf("[p2p]seed peer %s address format is wrong", n) continue } ns, err := net.LookupHost(ip) if err != nil { - log.Warnf("resolve err: %s", err.Error()) + log.Warnf("[p2p]resolve err: %s", err.Error()) continue } port, err := common.ParseIPPort(n) if err != nil { - log.Warnf("seed peer %s address format is wrong", n) + log.Warnf("[p2p]seed peer %s address format is wrong", n) continue } seedNodes = append(seedNodes, ns[0]+port) @@ -366,7 +366,7 @@ func (this *P2PServer) retryInactivePeer() { nodeAddr := ip.To16().String() + ":" + strconv.Itoa(int(p.GetSyncPort())) if p.GetSyncState() == common.INACTIVITY { - log.Infof(" try reconnect %s", nodeAddr) + log.Debugf("[p2p] try reconnect %s", nodeAddr) //add addr to retry list this.addToRetryList(nodeAddr) p.CloseSync() @@ -383,7 +383,7 @@ func (this *P2PServer) retryInactivePeer() { connCount := uint(this.network.GetOutConnRecordLen()) if connCount >= config.DefConfig.P2PNode.MaxConnOutBound { - log.Warnf("Connect: out connections(%d) reach the max limit(%d)", connCount, + log.Warnf("[p2p]Connect: out connections(%d) reach the max limit(%d)", connCount, config.DefConfig.P2PNode.MaxConnOutBound) return } @@ -420,9 +420,9 @@ func (this *P2PServer) retryInactivePeer() { this.ReconnectAddrs.Unlock() for _, addr := range addrs { rand.Seed(time.Now().UnixNano()) - log.Info("Try to reconnect peer, peer addr is ", addr) + log.Debug("[p2p]Try to reconnect peer, peer addr is ", addr) <-time.After(time.Duration(rand.Intn(common.CONN_MAX_BACK)) * time.Millisecond) - log.Info("Back off time`s up, start connect node") + log.Debug("[p2p]Back off time`s up, start connect node") this.network.Connect(addr, false) } @@ -511,7 +511,7 @@ func (this *P2PServer) timeout() { t := p.GetContactTime() if t.Before(time.Now().Add(-1 * time.Second * time.Duration(periodTime) * common.KEEPALIVE_TIMEOUT)) { - log.Warnf("keep alive timeout!!!lost remote peer %d - %s from %s", p.GetID(), p.SyncLink.GetAddr(), t.String()) + log.Warnf("[p2p]keep alive timeout!!!lost remote peer %d - %s from %s", p.GetID(), p.SyncLink.GetAddr(), t.String()) p.CloseSync() p.CloseCons() } @@ -550,17 +550,17 @@ func (this *P2PServer) tryRecentPeers() { if comm.FileExisted(common.RECENT_FILE_NAME) { buf, err := ioutil.ReadFile(common.RECENT_FILE_NAME) if err != nil { - log.Error("read %s fail:%s, connect recent peers cancel", common.RECENT_FILE_NAME, err.Error()) + log.Warn("[p2p]read %s fail:%s, connect recent peers cancel", common.RECENT_FILE_NAME, err.Error()) return } err = json.Unmarshal(buf, &this.recentPeers) if err != nil { - log.Error("parse recent peer file fail: ", err) + log.Warn("[p2p]parse recent peer file fail: ", err) return } if len(this.recentPeers[netID]) > 0 { - log.Info("try to connect recent peer") + log.Info("[p2p]try to connect recent peer") } for _, v := range this.recentPeers[netID] { go this.network.Connect(v, false) @@ -635,12 +635,12 @@ func (this *P2PServer) syncPeerAddr() { if changed { buf, err := json.Marshal(this.recentPeers) if err != nil { - log.Error("package recent peer fail: ", err) + log.Warn("[p2p]package recent peer fail: ", err) return } err = ioutil.WriteFile(common.RECENT_FILE_NAME, buf, os.ModePerm) if err != nil { - log.Error("write recent peer fail: ", err) + log.Warn("[p2p]write recent peer fail: ", err) } } } diff --git a/p2pserver/p2pserver_test.go b/p2pserver/p2pserver_test.go index 5f19e2496e..2d8af5192a 100644 --- a/p2pserver/p2pserver_test.go +++ b/p2pserver/p2pserver_test.go @@ -19,54 +19,28 @@ package p2pserver import ( - "bytes" - "encoding/binary" "fmt" "testing" - "time" - "github.com/ontio/ontology-crypto/keypair" - "github.com/ontio/ontology/account" "github.com/ontio/ontology/common/log" "github.com/ontio/ontology/p2pserver/common" ) -var key keypair.PublicKey -var acct *account.Account - func init() { - log.Init(log.Stdout) + log.InitLog(log.InfoLog) fmt.Println("Start test the netserver...") - acct = account.NewAccount("SHA256withECDSA") - key = acct.PubKey() } func TestNewP2PServer(t *testing.T) { log.Init(log.Stdout) fmt.Println("Start test new p2pserver...") - p2p, err := NewServer(acct) - if err != nil { - t.Fatalf("TestP2PActorServer: p2pserver NewServer error %s", err) - } - //false because the ledger actor not running - p2p.Start(false) - defer p2p.Stop() + p2p := NewServer() if p2p.GetVersion() != common.PROTOCOL_VERSION { t.Error("TestNewP2PServer p2p version error", p2p.GetVersion()) } - var id uint64 - k := keypair.SerializePublicKey(key) - err = binary.Read(bytes.NewBuffer(k[:8]), binary.LittleEndian, &(id)) - if err != nil { - t.Error(err) - } - - if p2p.GetID() != id { - t.Error("TestNewP2PServer p2p id error") - } if p2p.GetVersion() != common.PROTOCOL_VERSION { t.Error("TestNewP2PServer p2p version error") } @@ -78,6 +52,4 @@ func TestNewP2PServer(t *testing.T) { if cons != 20339 { t.Error("TestNewP2PServer consensus port error") } - go p2p.WaitForSyncBlkFinish() - <-time.After(time.Second * common.KEEPALIVE_TIMEOUT) } diff --git a/p2pserver/peer/nbr_peers.go b/p2pserver/peer/nbr_peers.go index c190adbb1c..ca2e4d799a 100644 --- a/p2pserver/peer/nbr_peers.go +++ b/p2pserver/peer/nbr_peers.go @@ -66,7 +66,7 @@ func (this *NbrPeers) AddNbrNode(p *Peer) { defer this.Unlock() if this.NodeExisted(p.GetID()) { - fmt.Printf("insert an existed node\n") + fmt.Printf("[p2p]insert an existed node\n") } else { this.List[p.GetID()] = p } diff --git a/p2pserver/peer/peer.go b/p2pserver/peer/peer.go index bbe46be436..1bf8fe982a 100644 --- a/p2pserver/peer/peer.go +++ b/p2pserver/peer/peer.go @@ -20,7 +20,6 @@ package peer import ( "errors" - "fmt" "net" "runtime" "sync" @@ -152,23 +151,23 @@ func NewPeer() *Peer { //rmPeer print a debug log when peer be finalized by system func rmPeer(p *Peer) { - log.Debug(fmt.Sprintf("Remove unused peer: %d", p.GetID())) + log.Debugf("[p2p]Remove unused peer: %d", p.GetID()) } //DumpInfo print all information of peer func (this *Peer) DumpInfo() { - log.Info("Node info:") - log.Info("\t syncState = ", this.syncState) - log.Info("\t consState = ", this.consState) - log.Info("\t id = ", this.GetID()) - log.Info("\t addr = ", this.SyncLink.GetAddr()) - log.Info("\t cap = ", this.cap) - log.Info("\t version = ", this.GetVersion()) - log.Info("\t services = ", this.GetServices()) - log.Info("\t syncPort = ", this.GetSyncPort()) - log.Info("\t consPort = ", this.GetConsPort()) - log.Info("\t relay = ", this.GetRelay()) - log.Info("\t height = ", this.GetHeight()) + log.Debug("[p2p]Node info:") + log.Debug("[p2p]\t syncState = ", this.syncState) + log.Debug("[p2p]\t consState = ", this.consState) + log.Debug("[p2p]\t id = ", this.GetID()) + log.Debug("[p2p]\t addr = ", this.SyncLink.GetAddr()) + log.Debug("[p2p]\t cap = ", this.cap) + log.Debug("[p2p]\t version = ", this.GetVersion()) + log.Debug("[p2p]\t services = ", this.GetServices()) + log.Debug("[p2p]\t syncPort = ", this.GetSyncPort()) + log.Debug("[p2p]\t consPort = ", this.GetConsPort()) + log.Debug("[p2p]\t relay = ", this.GetRelay()) + log.Debug("[p2p]\t height = ", this.GetHeight()) } //GetVersion return peer`s version @@ -236,7 +235,7 @@ func (this *Peer) SendToSync(msg types.Message) error { if this.SyncLink != nil && this.SyncLink.Valid() { return this.SyncLink.Tx(msg) } - return errors.New("sync link invalid") + return errors.New("[p2p]sync link invalid") } //SendToCons call consensus link to send buffer @@ -244,7 +243,7 @@ func (this *Peer) SendToCons(msg types.Message) error { if this.ConsLink != nil && this.ConsLink.Valid() { return this.ConsLink.Tx(msg) } - return errors.New("cons link invalid") + return errors.New("[p2p]cons link invalid") } //CloseSync halt sync connection @@ -309,8 +308,8 @@ func (this *Peer) GetAddr16() ([16]byte, error) { } ip := net.ParseIP(addrIp).To16() if ip == nil { - log.Error("parse ip address error\n", this.GetAddr()) - return result, errors.New("parse ip address error") + log.Warn("[p2p]parse ip address error\n", this.GetAddr()) + return result, errors.New("[p2p]parse ip address error") } copy(result[:], ip[:16])