Skip to main content
added 2593 characters in body
Source Link
Geo
  • 21
  • 3
//Copyright (c) 2022, Geolffrey Mena <[email protected]>

//P2P Noise Secure handshake.
//
//See also: http://www.noiseprotocol.org/noise.html#introduction
package noise

import (
    "context"
    "io"
    "log"
    "net"

     "github.com/geolffreym/p2p-noise/errors""time"
)

// Default protocol
const PROTOCOL = "tcp"

// futureDeadline calculate a new time for deadline since now.
func futureDeadLine(deadline time.Duration) time.Time {
    return time.Now().Add(deadline * time.Second)
}

type Config interface {
    MaxPeersConnected() uint8
    MaxPayloadSize() uint32
    PeerDeadline() time.Duration
}

type Node struct {
    sentinel chan bool // Channel flag waiting for signal to close connection.
    routersentinel chan bool

 *Router   // Routing hash table eg. {Socket: Conn interface}.
    events router *router

 *Events   // Pubsub notifications.
    events *events

    // Configuration settings
    config Config
}

// New create a new node with default
func NewNodeNew(config Config) *Node {
    return &Node{
        router:make(chan bool),
        newRouter(),
        events:   newEvents(),
        sentinel: make(chan bool)config,
    }
}

// Events proxy channels event to subscriber.
// The listening routine should be stopped using context param.
func (n *Node) Events(ctx context.Context) <-chan Message {
    ch := make(chan Message)
    go n.events.Subscriber().Listen(ctx, ch)
    return ch // read only channel <-chanfor raw messages
}

// EmitMessage emit a new message to socket.
// If socket doesn't exists or peer is not connected return error.
// Calling EmitMessage extends write deadline.
func (n *Node) EmitMessage(socket Socket, message []byte) (int, error) {
    peer := n.router.Query(socket)
    if peer == nil {
        return 0, ErrSendingMessageToInvalidPeer(socket)
    }

    bytes, err := peer.Send(message)
    // An idle timeout can be implemented by repeatedly extending
    // the deadline after successful Read or Write calls.
    // SetWriteDeadline sets the deadline for future Write calls
    // and any currently-blocked Write call.
    // Even if write times out, it may return n > 0, indicating that
    // some of the data was successfully written.
    idle := futureDeadLine(n.config.PeerDeadline())
    peer.SetWriteDeadline(idle)
    return bytes, err
}

// watch keep running waiting for incoming messages.
// After every new message the connection is verified, if local connection is closed or remote peer is disconnected the watch routine is stopped.
// Incoming message monitor is suggested to be processed in go routines.
func (n *Node) watch(peer *Peer) {
    buf := make([]byte, 1024)

KEEPALIVE:
    for {
 
        // SyncWaiting bufferfor readingnew incoming message
        _buf, err := peer.ReceiveListen(bufn.config.MaxPayloadSize())
        // If connection is closed
        if n.Closed() {
            // stop routines watching for peers
            return
        }

        if err != nil {
            // net: don't return io.EOF from zero byte reads
           OverflowError //is ifreturned errwhen ==the io.EOFincoming thenpayload peerexceed connectionthe isexpected closedsize
            _, isNetError := err.(*net.OpError)
            if err == io.EOF || isNetError {
                // Close disconnected peer
                if erroverflow := peer.Close(); err != nil {
                    log.Fatal(errors.Closing(err).Error()OverflowError)
                }

        // Don't stop listening for peer if overflow payload is returned.
        if err != nil && !overflow {
            // net: don't return io.EOF from zero byte reads
            // Notify about the remote peer state
                n.events.PeerDisconnected([]byte(peer.Socket()))
                // Remove peer from router table
                n.router.Remove(peer)
                return
            }

        if buf == nil {
            // `buf` is nil if no more bytes received but peer is still connected
            // Keep alive always that zero bytes are not received
            break KEEPALIVE
        }

        // Emit new incoming message notification
        n.events.NewMessage(buf)

        // An idle timeout can be implemented by repeatedly extending
        // the deadline after successful Read or Write calls.
        // SetReadDeadline sets the deadline for future Read calls
        // and any currently-blocked Read call.
        idle := futureDeadLine(n.config.PeerDeadline())
        peer.SetReadDeadline(idle)

    }

}

// routing initialize route in routing table from connection interface.
// If TCP protocol is used connection is enforced to keep alive.
// It return new peer added to table.
func (n *Node) routing(conn net.Conn) (*Peer, error) {

    // Assertion for tcp connection to keep alive
    connection, okisTCP := conn.(*net.TCPConn)
    if okisTCP {
        // If tcp enforce keep alive connection
        // SetKeepAlive sets whether the operating system should send keep-alive messages on the connection.
        connection.SetKeepAlive(true)
    }

    // Drop connections if max peers exceeded
    if n.router.Len() >= n.config.MaxPeersConnected() {
        log.Fatalf("max peers exceeded: MaxPeerConnected = %d", n.config.MaxPeersConnected())
        return nil, ErrExceededMaxPeers(n.config.MaxPeersConnected())
    }

    // Initial deadline for connection.
    // A deadline is an absolute time after which I/O operations
    // fail instead of blocking. The deadline applies to all future
    // and pending I/O, not just the immediately following call to
    // Read or Write. After a deadline has been exceeded, the
    // connection can be refreshed by setting a deadline in the future.
    // ref: https://pkg.go.dev/net#Conn
    idle := futureDeadLine(n.config.PeerDeadline())
    connection.SetDeadline(idle)
    // Routing connections
    remote := connection.RemoteAddr().String()
    // eg. 192.168.1.1:8080
    socket := Socket(remote)
    // We need to know how interact with peer based on socket and connection
    peer := newPeer(socket, connection)
    // Store new peer in router table
    n.router.Add(peer)
    return peer, nil
}

// Listen start listening on the given address and wait for new connection.
// Return error if error occurred while listening.
func (n *Node) Listen(addr stringSocket) error {

    listener, err := net.Listen(PROTOCOL, addr)
    if err != nil {
        return err
    }

    // Dispatch event on start listening
    n.events.Listening([]byte(addr))
    //wait until sentinel channel is closed to close listener
    godefer func(listener net.Listener) {
        <-n.sentinel
        err := listener.Close()
        if err != nil {
            log.Fatal(errors.ClosingErrClosingConnection(err).Error())
        }
    }(listener)

    for {
        // Block/Hold while waiting for new incoming connection
        // Synchronized incoming connections
        conn, err := listener.Accept()
        // If connection is closed
        if n.Closed() {
            // Graceful stop listening
            return nil
        }

        if err != nil {
            log.Fatal(errors.BindingErrBindingConnection(err).Error())
            return err
        }

        // Routing for accepted connection
        peer, err := n.routing(conn) 
 // Routing for connection    if err != nil {
        go n   conn.watchClose(peer) // Drop connection
            continue
        }

        go n.watch(peer) // Wait for incoming messages
        // Dispatch event for new peer connected
        payload := []byte(peer.Socket())
        n.events.PeerConnected(payload)
    }

}

// Table return current routing table.
func (n *Node) Table() Table {
    return n.router.Table()
}

// Closed check connection state.
// Return true for connection open else false.
func (n *Node) Closed() bool {
    select {
    // select await for sentinel if not closed then default is returned.
    case <-n.sentinel:
        return true
    default:
        return false
    }
}

// Close all peers connections and stop listening
func (n *Node) Close() {
    for _, peer := range n.router.Table() {
        go func(p *Peer) {
            if err := p.Close(); err != nil {
                log.Fatal(errors.ClosingErrClosingConnection(err).Error())
            }
        }(peer)
    }

    // Dispatch event on node get closed
    n.events.ClosedConnection()
    // If channel get closed then all routines waiting for connections
    // or waiting for incoming messages get closed too.
    close(n.sentinel)
}

// Dial attempt to connect to remote node and add connected peer to routing table.
// Return error if error occurred while dialing node.
func (n *Node) Dial(addr stringSocket) error {
    conn, err := net.Dial(PROTOCOL, addr)
    if err != nil {
        return errors.DialingErrDialingNode(err, addr)
    }

    // Routing for dialed connection
    peer, err := n.routing(conn)
    if err != nil {
        conn.Close() // Routing forDrop connection
    go n.watch   return ErrDialingNode(peererr, addr) 
    }

    go n.watch(peer) // Wait for incoming messages
    // Dispatch event for new peer connected
    n.events.PeerConnected([]byte(peer.Socket()))
    return nil
}
 

//Copyrightimport (c) 
 2022, Geolffrey Mena <[email protected]>"context"
    "log"

//P2P Noise Secure handshake noise "github.
 com/geolffreym/p2p-noise"
//See also: http://www.noiseprotocol  "github.orgcom/geolffreym/p2p-noise.html#introduction/config"
package)

func main() {

import (   // Create configurations from params and write in configurations reference
    "context"configurations := config.New()
    "log"configurations.Write(
        config.SetMaxPeersConnected(10),
    noise "github   config.com/geolffreym/p2p-noise"SetPeerDeadline(1800),
    )

func main() {  // Node factory
    node := noise.NewNodeNew(configurations)
    // Network events channel
    ctx, cancel := context.WithCancel(context.Background())
    var events :<-chan noise.Message = node.Events(ctx)

    go func() {
        for msg := range events {
            // Here could be handled events
            if msg.Type() == noise.SelfListening {
                log.Printf("Listening on: %s \n", msg.Payload())
                cancel() // stop listening for events
            }
        }
    }()

    // ... some code here
    // node.Dial("192.168.1.1:4008")
    // node.Close()

    // ... more code here
    node.Listen("127.0.0.1:4008")

}

//Copyright (c) 2022, Geolffrey Mena <[email protected]>

//P2P Noise Secure handshake.
//
//See also: http://www.noiseprotocol.org/noise.html#introduction
package noise

import (
    "context"
    "io"
    "log"
    "net"

     "github.com/geolffreym/p2p-noise/errors"
)

// Default protocol
const PROTOCOL = "tcp"

type Node struct {
    sentinel chan bool // Channel flag waiting for signal to close connection.
    router   *Router   // Routing hash table eg. {Socket: Conn interface}.
    events   *Events   // Pubsub notifications.
}

func NewNode() *Node {
    return &Node{
        router:   newRouter(),
        events:   newEvents(),
        sentinel: make(chan bool),
    }
}

// Events proxy channels event to subscriber
// The listening routine should be stopped using context param.
func (n *Node) Events(ctx context.Context) <-chan Message {
    ch := make(chan Message)
    go n.events.Subscriber().Listen(ctx, ch)
    return ch // read only channel <-chan
}

// watch keep running waiting for incoming messages.
// After every new message the connection is verified, if local connection is closed or remote peer is disconnected the watch routine is stopped.
// Incoming message monitor is suggested to be processed in go routines.
func (n *Node) watch(peer *Peer) {
    buf := make([]byte, 1024)

KEEPALIVE:
    for {
        // Sync buffer reading
        _, err := peer.Receive(buf)
        // If connection is closed
        if n.Closed() {
            // stop routines watching for peers
            return
        }

        if err != nil {
            // net: don't return io.EOF from zero byte reads
            // if err == io.EOF then peer connection is closed
            _, isNetError := err.(*net.OpError)
            if err == io.EOF || isNetError {
                // Close disconnected peer
                if err := peer.Close(); err != nil {
                    log.Fatal(errors.Closing(err).Error())
                }

                // Notify about the remote peer state
                n.events.PeerDisconnected([]byte(peer.Socket()))
                // Remove peer from router table
                n.router.Remove(peer)
                return
            }

            // Keep alive always that zero bytes are not received
            break KEEPALIVE
        }

        // Emit new incoming message notification
        n.events.NewMessage(buf)
    }

}

// routing initialize route in routing table from connection interface.
// If TCP protocol is used connection is enforced to keep alive.
// It return new peer added to table.
func (n *Node) routing(conn net.Conn) *Peer {

    // Assertion for tcp connection to keep alive
    connection, ok := conn.(*net.TCPConn)
    if ok {
        // If tcp enforce keep alive connection
        // SetKeepAlive sets whether the operating system should send keep-alive messages on the connection.
        connection.SetKeepAlive(true)
    }

    // Routing connections
    remote := connection.RemoteAddr().String()
    // eg. 192.168.1.1:8080
    socket := Socket(remote)
    // We need to know how interact with peer based on socket and connection
    peer := newPeer(socket, connection)
    n.router.Add(peer)
    return peer
}

// Listen start listening on the given address and wait for new connection.
// Return error if error occurred while listening.
func (n *Node) Listen(addr string) error {

    listener, err := net.Listen(PROTOCOL, addr)
    if err != nil {
        return err
    }

    // Dispatch event on start listening
    n.events.Listening([]byte(addr))
    //wait until sentinel channel is closed to close listener
    go func(listener net.Listener) {
        <-n.sentinel
        err := listener.Close()
        if err != nil {
            log.Fatal(errors.Closing(err).Error())
        }
    }(listener)

    for {
        // Block/Hold while waiting for new incoming connection
        // Synchronized incoming connections
        conn, err := listener.Accept()
        // If connection is closed
        if n.Closed() {
            // Graceful stop listening
            return nil
        }

        if err != nil {
            log.Fatal(errors.Binding(err).Error())
            return err
        }

        peer := n.routing(conn) // Routing for connection
        go n.watch(peer)        // Wait for incoming messages
        // Dispatch event for new peer connected
        payload := []byte(peer.Socket())
        n.events.PeerConnected(payload)
    }

}

// Table return current routing table
func (n *Node) Table() Table {
    return n.router.Table()
}

// Closed check connection state.
// Return true for connection open else false
func (n *Node) Closed() bool {
    select {
    case <-n.sentinel:
        return true
    default:
        return false
    }
}

// Close all peers connections and stop listening
func (n *Node) Close() {
    for _, peer := range n.router.Table() {
        go func(p *Peer) {
            if err := p.Close(); err != nil {
                log.Fatal(errors.Closing(err).Error())
            }
        }(peer)
    }

    // Dispatch event on node get closed
    n.events.ClosedConnection()
    // If channel get closed then all routines waiting for connections
    // or waiting for incoming messages get closed too.
    close(n.sentinel)
}

// Dial attempt to connect to remote node and add connected peer to routing table.
// Return error if error occurred while dialing node.
func (n *Node) Dial(addr string) error {
    conn, err := net.Dial(PROTOCOL, addr)
    if err != nil {
        return errors.Dialing(err, addr)
    }

    peer := n.routing(conn) // Routing for connection
    go n.watch(peer)        // Wait for incoming messages
    // Dispatch event for new peer connected
    n.events.PeerConnected([]byte(peer.Socket()))
    return nil
}

//Copyright (c) 2022, Geolffrey Mena <[email protected]>

//P2P Noise Secure handshake.
 //
//See also: http://www.noiseprotocol.org/noise.html#introduction
package main

import (
    "context"
    "log"

    noise "github.com/geolffreym/p2p-noise"
)

func main() {
    node := noise.NewNode()
    // Network events channel
    ctx, cancel := context.WithCancel(context.Background())
    events := node.Events(ctx)

    go func() {
        for msg := range events {
            log.Printf("Listening on: %s \n", msg.Payload())
            cancel() // stop listening for events
        }
    }()

    // ... some code here
    // node.Dial("192.168.1.1:4008")
    // node.Close()

    // ... more code here
    node.Listen("127.0.0.1:4008")

}

package noise

import (
    "context"
    "log"
    "net"
    "time"
)

// Default protocol
const PROTOCOL = "tcp"

// futureDeadline calculate a new time for deadline since now.
func futureDeadLine(deadline time.Duration) time.Time {
    return time.Now().Add(deadline * time.Second)
}

type Config interface {
    MaxPeersConnected() uint8
    MaxPayloadSize() uint32
    PeerDeadline() time.Duration
}

type Node struct {
    // Channel flag waiting for signal to close connection.
    sentinel chan bool

    // Routing hash table eg. {Socket: Conn interface}.
    router *router

    // Pubsub notifications.
    events *events

    // Configuration settings
    config Config
}

// New create a new node with default
func New(config Config) *Node {
    return &Node{
        make(chan bool),
        newRouter(),
        newEvents(),
        config,
    }
}

// Events proxy channels to subscriber.
// The listening routine should be stopped using context param.
func (n *Node) Events(ctx context.Context) <-chan Message {
    ch := make(chan Message)
    go n.events.Subscriber().Listen(ctx, ch)
    return ch // read only channel for raw messages
}

// EmitMessage emit a new message to socket.
// If socket doesn't exists or peer is not connected return error.
// Calling EmitMessage extends write deadline.
func (n *Node) EmitMessage(socket Socket, message []byte) (int, error) {
    peer := n.router.Query(socket)
    if peer == nil {
        return 0, ErrSendingMessageToInvalidPeer(socket)
    }

    bytes, err := peer.Send(message)
    // An idle timeout can be implemented by repeatedly extending
    // the deadline after successful Read or Write calls.
    // SetWriteDeadline sets the deadline for future Write calls
    // and any currently-blocked Write call.
    // Even if write times out, it may return n > 0, indicating that
    // some of the data was successfully written.
    idle := futureDeadLine(n.config.PeerDeadline())
    peer.SetWriteDeadline(idle)
    return bytes, err
}

// watch keep running waiting for incoming messages.
// After every new message the connection is verified, if local connection is closed or remote peer is disconnected the watch routine is stopped.
// Incoming message monitor is suggested to be processed in go routines.
func (n *Node) watch(peer *Peer) {

KEEPALIVE:
    for {
 
        // Waiting for new incoming message
        buf, err := peer.Listen(n.config.MaxPayloadSize())
        // If connection is closed
        if n.Closed() {
            // stop routines watching for peers
            return
        }

        // OverflowError is returned when the incoming payload exceed the expected size
        _, overflow := err.(OverflowError)

        // Don't stop listening for peer if overflow payload is returned.
        if err != nil && !overflow {
            // net: don't return io.EOF from zero byte reads
            // Notify about the remote peer state
            n.events.PeerDisconnected([]byte(peer.Socket()))
            // Remove peer from router table
            n.router.Remove(peer)
            return
        }

        if buf == nil {
            // `buf` is nil if no more bytes received but peer is still connected
            // Keep alive always that zero bytes are not received
            break KEEPALIVE
        }

        // Emit new incoming message notification
        n.events.NewMessage(buf)

        // An idle timeout can be implemented by repeatedly extending
        // the deadline after successful Read or Write calls.
        // SetReadDeadline sets the deadline for future Read calls
        // and any currently-blocked Read call.
        idle := futureDeadLine(n.config.PeerDeadline())
        peer.SetReadDeadline(idle)

    }

}

// routing initialize route in routing table from connection interface.
// If TCP protocol is used connection is enforced to keep alive.
// It return new peer added to table.
func (n *Node) routing(conn net.Conn) (*Peer, error) {

    // Assertion for tcp connection to keep alive
    connection, isTCP := conn.(*net.TCPConn)
    if isTCP {
        // If tcp enforce keep alive connection
        // SetKeepAlive sets whether the operating system should send keep-alive messages on the connection.
        connection.SetKeepAlive(true)
    }

    // Drop connections if max peers exceeded
    if n.router.Len() >= n.config.MaxPeersConnected() {
        log.Fatalf("max peers exceeded: MaxPeerConnected = %d", n.config.MaxPeersConnected())
        return nil, ErrExceededMaxPeers(n.config.MaxPeersConnected())
    }

    // Initial deadline for connection.
    // A deadline is an absolute time after which I/O operations
    // fail instead of blocking. The deadline applies to all future
    // and pending I/O, not just the immediately following call to
    // Read or Write. After a deadline has been exceeded, the
    // connection can be refreshed by setting a deadline in the future.
    // ref: https://pkg.go.dev/net#Conn
    idle := futureDeadLine(n.config.PeerDeadline())
    connection.SetDeadline(idle)
    // Routing connections
    remote := connection.RemoteAddr().String()
    // eg. 192.168.1.1:8080
    socket := Socket(remote)
    // We need to know how interact with peer based on socket and connection
    peer := newPeer(socket, connection)
    // Store new peer in router table
    n.router.Add(peer)
    return peer, nil
}

// Listen start listening on the given address and wait for new connection.
// Return error if error occurred while listening.
func (n *Node) Listen(addr Socket) error {

    listener, err := net.Listen(PROTOCOL, addr)
    if err != nil {
        return err
    }

    // Dispatch event on start listening
    n.events.Listening([]byte(addr))
    //wait until sentinel channel is closed to close listener
    defer func() {
        err := listener.Close()
        if err != nil {
            log.Fatal(ErrClosingConnection(err).Error())
        }
    }()

    for {
        // Block/Hold while waiting for new incoming connection
        // Synchronized incoming connections
        conn, err := listener.Accept()
        // If connection is closed
        if n.Closed() {
            // Graceful stop listening
            return nil
        }

        if err != nil {
            log.Fatal(ErrBindingConnection(err).Error())
            return err
        }

        // Routing for accepted connection
        peer, err := n.routing(conn) 
        if err != nil {
            conn.Close() // Drop connection
            continue
        }

        go n.watch(peer) // Wait for incoming messages
        // Dispatch event for new peer connected
        payload := []byte(peer.Socket())
        n.events.PeerConnected(payload)
    }

}

// Table return current routing table.
func (n *Node) Table() Table {
    return n.router.Table()
}

// Closed check connection state.
// Return true for connection open else false.
func (n *Node) Closed() bool {
    select {
    // select await for sentinel if not closed then default is returned.
    case <-n.sentinel:
        return true
    default:
        return false
    }
}

// Close all peers connections and stop listening
func (n *Node) Close() {
    for _, peer := range n.router.Table() {
        go func(p *Peer) {
            if err := p.Close(); err != nil {
                log.Fatal(ErrClosingConnection(err).Error())
            }
        }(peer)
    }

    // Dispatch event on node get closed
    n.events.ClosedConnection()
    // If channel get closed then all routines waiting for connections
    // or waiting for incoming messages get closed too.
    close(n.sentinel)
}

// Dial attempt to connect to remote node and add connected peer to routing table.
// Return error if error occurred while dialing node.
func (n *Node) Dial(addr Socket) error {
    conn, err := net.Dial(PROTOCOL, addr)
    if err != nil {
        return ErrDialingNode(err, addr)
    }

    // Routing for dialed connection
    peer, err := n.routing(conn)
    if err != nil {
        conn.Close() // Drop connection
        return ErrDialingNode(err, addr) 
    }

    go n.watch(peer) // Wait for incoming messages
    // Dispatch event for new peer connected
    n.events.PeerConnected([]byte(peer.Socket()))
    return nil
}
 

import ( 
    "context"
    "log"

    noise "github.com/geolffreym/p2p-noise"
    "github.com/geolffreym/p2p-noise/config"
)

func main() {

    // Create configurations from params and write in configurations reference
    configurations := config.New()
    configurations.Write(
        config.SetMaxPeersConnected(10),
        config.SetPeerDeadline(1800),
    )

    // Node factory
    node := noise.New(configurations)
    // Network events channel
    ctx, cancel := context.WithCancel(context.Background())
    var events <-chan noise.Message = node.Events(ctx)

    go func() {
        for msg := range events {
            // Here could be handled events
            if msg.Type() == noise.SelfListening {
                log.Printf("Listening on: %s \n", msg.Payload())
                cancel() // stop listening for events
            }
        }
    }()

    // ... some code here
    // node.Dial("192.168.1.1:4008")
    // node.Close()

    // ... more code here
    node.Listen("127.0.0.1:4008")

}

added 75 characters in body
Source Link
Geo
  • 21
  • 3
// NodeCopyright implements(c) a2022, lightweightGeolffrey TCPMena communication<gmjun2000@gmail.com>

// Offers pretty basic features toP2P communicateNoise betweenSecure nodeshandshake.
//
// See also: httpshttp://pkgwww.gonoiseprotocol.devorg/net#Connnoise.html#introduction
package noise

import (
    "context"
    "io"
    "log"
    "net"

    "github.com/geolffreym/p2p-noise/errors"
)

// Default protocol
const PROTOCOL = "tcp"

type (
    Socket = string
    Table  = map[Socket]*Peer
)

// Node implement communication logic
type Node struct {
    sentinel chan bool // Channel flag waiting for signal to close connection.
    router   *Router   // Routing hash table eg. {Socket: Conn interface}.
    events   *Events   // Pubsub notifications.
}

// Node factory
// It receive a param events message handler for network.
func NewNode() *Node {
    return &Node{
        router:   newRouter(),
        events:   newEvents(),
        sentinel: make(chan bool),
    }
}

// ProxyEvents eventproxy tochannels event to subscriber
// listenerThe listening routine should be stopped using context param.
func (noden *Node) Events(cb Observer)ctx context.CancelFuncContext) <-chan Message {
    ctx, cancelch := context.WithCancel(context.Backgroundmake()chan Message)
    go noden.events.Subscriber().Listen(ctx, cbch)
    return cancelch // read only channel <-chan
}

// watch watchdogkeep running waiting for incoming messages.
// incomingAfter every new message the connection is verified, if local connection is closed or remote peer is disconnected the watch routine is stopped.
// Incoming message monitor is suggested to be processed in go routines.
func (noden *Node) watch(peer *Peer) {
    buf := make([]byte, 1024)

KEEPALIVE:
    for {
        // Sync buffer reading
        _, err := peer.Receive(buf)
        // If connection is closed
        //if stopn.Closed() routines{
 watching peers
        if node.Closed() {// stop routines watching for peers
            return
        }

        if err != nil {
            // net: don't return io.EOF from zero byte reads
            // if err == io.EOF then peer connection is closed
            _, isNetError := err.(*net.OpError)
            if err == io.EOF || isNetError {
                err := peer.Close() // Close disconnected peer
                if err := peer.Close(); err != nil {
                    log.Fatal(errors.Closing(err).Error())
                }

                //Notify to nodeNotify about the remote peer state
                noden.events.PeerDisconnected([]byte(peer.Socket()))
                // Remove peer from router table
                noden.router.DeleteRemove(peer)
                return
            }

            // Keep alive always that zero bytes are not received
            break KEEPALIVE
        }

        // Emit new incoming message notification
        noden.events.NewMessage(buf)
    }

}

// routing initialize route in routing table from connection interface.
// ReturnIf TCP protocol is used connection is enforced to keep alive.
// It return new peer added to table.
func (noden *Node) routing(conn net.Conn) *Peer {

    // Assertion for tcp connection to keep alive
    connection, isTCPok := conn.(*net.TCPConn)
    if isTCPok {
        // If tcp enforce keep alive connection
        // SetKeepAlive sets whether the operating system should send keep-alive messages on the connection.
        connection.SetKeepAlive(true)
    }

    // Routing connections
    remote := connection.RemoteAddr().String()
    // eg. 192.168.1.1:8080
    socket := Socket(remote)
    // We need to know how interact with peer based on socket and connection
    peer := newPeer(socket, connection)
    noden.router.Add(peer)
    return peer
}

// Listen start listening on the given address and wait for new connection.
// Return error if error occurred while listening.
func (noden *Node) Listen(addr string) error {

    listener, err := net.Listen(PROTOCOL, addr)
    if err != nil {
        return err
    }

    // Dispatch event on start listening
    noden.events.Listening([]byte(addr))
    //wait monitoruntil connectionsentinel channel is closed to close listener
    go func(listener net.Listener) {
        <-noden.sentinel
        err := listener.Close()
        if err != nil {
            log.Fatal(errors.Closing(err).Error())
        }
    }(listener)

    for {
        // Block/Hold while waiting for new incoming connection
        // Synchronized incoming connections
        conn, err := listener.Accept()
        // If connection is closed
        // Gracefulif stopn.Closed() listening{
        if node.Closed() {  // Graceful stop listening
            return nil
        }

        if err != nil {
            log.Fatal(errors.Binding(err).Error())
            return err
        }

        peer := noden.routing(conn) // Routing for connection
        go noden.watch(peer)        // Wait for incoming messages
        // Dispatch event for new peer connected
        payload := []byte(peer.Socket())
        noden.events.PeerConnected(payload)
    }

}

// ReturnTable return current routing table
func (noden *Node) Table() Table {
    return noden.router.Table()
}

// Closed Non-blocking check connection state.
// Return true for connection open else false
func (noden *Node) Closed() bool {
    select {
    case <-noden.sentinel:
        return true
    default:
        return false
    }
}

// Close all peers connections and stop listening
func (noden *Node) Close() {
    for _, peer := range noden.router.Table() {
        go func(p *Peer) {
            if err := p.Close(); err != nil {
                log.Fatal(errors.Closing(err).Error())
            }
        }(peer)
    }

    // Dispatch event on node get closed
    noden.events.ClosedConnection()
    // If channel get closed then all routines waiting for connections
    // or waiting for incoming messages get closed too.
    close(noden.sentinel)
}

// Dial attempt to connect to remote node and add connected peer to routing table.
// Return error if error occurred while dialing node.
func (noden *Node) Dial(addr string) error {
    conn, err := net.Dial(PROTOCOL, addr)
    if err != nil {
        return errors.Dialing(err, addr)
    }

    peer := noden.routing(conn) // Routing for connection
    go noden.watch(peer)        // Wait for incoming messages
    // Dispatch event for new peer connected
    noden.events.PeerConnected([]byte(peer.Socket()))
    return nil
}

//Copyright (c) 2022, Geolffrey Mena <[email protected]>

//P2P Noise Secure handshake.
//
//See also: http://www.noiseprotocol.org/noise.html#introduction
package main

import (
    "context"
    "log"

    noise "github.com/geolffreym/p2p-noise"
)

func main() {
    
    node := noise.NewNode()
    // Network events channel
    ctx, cancel := context.WithCancel(context.Background())
    events := node.Events(ctx)

    go func() {
        for msg := range events {
            log.Printf("Listening on: %s \n", msg.Payload())
            cancel() // stop listening for events
        }
    }()

    // ... some code here
    // node.Dial("192.168.1.1:4008")
    // node.Close()

    // ... more code here
    node.Listen("127.0.0.1:4008")

}

// Node implements a lightweight TCP communication.
// Offers pretty basic features to communicate between nodes.
//
// See also: https://pkg.go.dev/net#Conn
package noise

import (
    "context"
    "io"
    "log"
    "net"

    "github.com/geolffreym/p2p-noise/errors"
)

// Default protocol
const PROTOCOL = "tcp"

type (
    Socket = string
    Table  = map[Socket]*Peer
)

// Node implement communication logic
type Node struct {
    sentinel chan bool // Channel flag waiting for signal to close connection.
    router   *Router   // Routing hash table eg. {Socket: Conn interface}.
    events   *Events   // Pubsub notifications.
}

// Node factory
// It receive a param events message handler for network.
func NewNode() *Node {
    return &Node{
        router:   newRouter(),
        events:   newEvents(),
        sentinel: make(chan bool),
    }
}

// Proxy event to event subscriber listener
func (node *Node) Events(cb Observer) context.CancelFunc {
    ctx, cancel := context.WithCancel(context.Background())
    go node.events.Subscriber().Listen(ctx, cb)
    return cancel
}

// watch watchdog for incoming messages.
// incoming message monitor is suggested to be processed in go routines.
func (node *Node) watch(peer *Peer) {
    buf := make([]byte, 1024)

KEEPALIVE:
    for {
        // Sync buffer reading
        _, err := peer.Receive(buf)
        // If connection is closed
        // stop routines watching peers
        if node.Closed() {
            return
        }

        if err != nil {
            // net: don't return io.EOF from zero byte reads
            // if err == io.EOF then peer connection is closed
            _, isNetError := err.(*net.OpError)
            if err == io.EOF || isNetError {
                err := peer.Close() // Close disconnected peer
                if err != nil {
                    log.Fatal(errors.Closing(err).Error())
                }

                //Notify to node about the peer state
                node.events.PeerDisconnected([]byte(peer.Socket()))
                // Remove peer from router table
                node.router.Delete(peer)
                return
            }

            // Keep alive always that zero bytes are not received
            break KEEPALIVE
        }

        // Emit new incoming message notification
        node.events.NewMessage(buf)
    }

}

// routing initialize route in routing table from connection interface
// Return new peer added to table
func (node *Node) routing(conn net.Conn) *Peer {

    // Assertion for tcp connection to keep alive
    connection, isTCP := conn.(*net.TCPConn)
    if isTCP {
        // If tcp enforce keep alive connection
        // SetKeepAlive sets whether the operating system should send keep-alive messages on the connection.
        connection.SetKeepAlive(true)
    }

    // Routing connections
    remote := connection.RemoteAddr().String()
    // eg. 192.168.1.1:8080
    socket := Socket(remote)
    // We need to know how interact with peer based on socket and connection
    peer := newPeer(socket, connection)
    node.router.Add(peer)
    return peer
}

// Listen start listening on the given address and wait for new connection.
// Return error if error occurred while listening.
func (node *Node) Listen(addr string) error {

    listener, err := net.Listen(PROTOCOL, addr)
    if err != nil {
        return err
    }

    // Dispatch event on start listening
    node.events.Listening([]byte(addr))
    // monitor connection to close listener
    go func(listener net.Listener) {
        <-node.sentinel
        err := listener.Close()
        if err != nil {
            log.Fatal(errors.Closing(err).Error())
        }
    }(listener)

    for {
        // Block/Hold while waiting for new incoming connection
        // Synchronized incoming connections
        conn, err := listener.Accept()
        // If connection is closed
        // Graceful stop listening
        if node.Closed() {
            return nil
        }

        if err != nil {
            log.Fatal(errors.Binding(err).Error())
            return err
        }

        peer := node.routing(conn) // Routing for connection
        go node.watch(peer)        // Wait for incoming messages
        // Dispatch event for new peer connected
        payload := []byte(peer.Socket())
        node.events.PeerConnected(payload)
    }

}

// Return current routing table
func (node *Node) Table() Table {
    return node.router.Table()
}

// Closed Non-blocking check connection state.
// Return true for connection open else false
func (node *Node) Closed() bool {
    select {
    case <-node.sentinel:
        return true
    default:
        return false
    }
}

// Close all peers connections and stop listening
func (node *Node) Close() {
    for _, peer := range node.router.Table() {
        go func(p *Peer) {
            if err := p.Close(); err != nil {
                log.Fatal(errors.Closing(err).Error())
            }
        }(peer)
    }

    // Dispatch event on node get closed
    node.events.ClosedConnection()
    // If channel get closed then all routines waiting for connections
    // or waiting for incoming messages get closed too.
    close(node.sentinel)
}

// Dial to node and add connected peer to routing table
// Return error if error occurred while dialing node.
func (node *Node) Dial(addr string) error {
    conn, err := net.Dial(PROTOCOL, addr)
    if err != nil {
        return errors.Dialing(err, addr)
    }

    peer := node.routing(conn) // Routing for connection
    go node.watch(peer)        // Wait for incoming messages
    // Dispatch event for new peer connected
    node.events.PeerConnected([]byte(peer.Socket()))
    return nil
}

//Copyright (c) 2022, Geolffrey Mena <[email protected]>

//P2P Noise Secure handshake.
//
//See also: http://www.noiseprotocol.org/noise.html#introduction
package main

import (
    "context"
    "log"

    noise "github.com/geolffreym/p2p-noise"
)

func main() {
    
    node := noise.NewNode()
    // Network events channel
    ctx, cancel := context.WithCancel(context.Background())
    events := node.Events(ctx)

    go func() {
        for msg := range events {
            log.Printf("Listening on: %s \n", msg.Payload())
            cancel() // stop listening for events
        }
    }()

    // ... some code here
    node.Dial("192.168.1.1:4008")
    node.Close()

    // ... more code here
    node.Listen("127.0.0.1:4008")

}

//Copyright (c) 2022, Geolffrey Mena <gmjun2000@gmail.com>

//P2P Noise Secure handshake.
//
//See also: http://www.noiseprotocol.org/noise.html#introduction
package noise

import (
    "context"
    "io"
    "log"
    "net"

    "github.com/geolffreym/p2p-noise/errors"
)

// Default protocol
const PROTOCOL = "tcp"

type Node struct {
    sentinel chan bool // Channel flag waiting for signal to close connection.
    router   *Router   // Routing hash table eg. {Socket: Conn interface}.
    events   *Events   // Pubsub notifications.
}

func NewNode() *Node {
    return &Node{
        router:   newRouter(),
        events:   newEvents(),
        sentinel: make(chan bool),
    }
}

// Events proxy channels event to subscriber
// The listening routine should be stopped using context param.
func (n *Node) Events(ctx context.Context) <-chan Message {
    ch := make(chan Message)
    go n.events.Subscriber().Listen(ctx, ch)
    return ch // read only channel <-chan
}

// watch keep running waiting for incoming messages.
// After every new message the connection is verified, if local connection is closed or remote peer is disconnected the watch routine is stopped.
// Incoming message monitor is suggested to be processed in go routines.
func (n *Node) watch(peer *Peer) {
    buf := make([]byte, 1024)

KEEPALIVE:
    for {
        // Sync buffer reading
        _, err := peer.Receive(buf)
        // If connection is closed
        if n.Closed() {
            // stop routines watching for peers
            return
        }

        if err != nil {
            // net: don't return io.EOF from zero byte reads
            // if err == io.EOF then peer connection is closed
            _, isNetError := err.(*net.OpError)
            if err == io.EOF || isNetError {
                // Close disconnected peer
                if err := peer.Close(); err != nil {
                    log.Fatal(errors.Closing(err).Error())
                }

                // Notify about the remote peer state
                n.events.PeerDisconnected([]byte(peer.Socket()))
                // Remove peer from router table
                n.router.Remove(peer)
                return
            }

            // Keep alive always that zero bytes are not received
            break KEEPALIVE
        }

        // Emit new incoming message notification
        n.events.NewMessage(buf)
    }

}

// routing initialize route in routing table from connection interface.
// If TCP protocol is used connection is enforced to keep alive.
// It return new peer added to table.
func (n *Node) routing(conn net.Conn) *Peer {

    // Assertion for tcp connection to keep alive
    connection, ok := conn.(*net.TCPConn)
    if ok {
        // If tcp enforce keep alive connection
        // SetKeepAlive sets whether the operating system should send keep-alive messages on the connection.
        connection.SetKeepAlive(true)
    }

    // Routing connections
    remote := connection.RemoteAddr().String()
    // eg. 192.168.1.1:8080
    socket := Socket(remote)
    // We need to know how interact with peer based on socket and connection
    peer := newPeer(socket, connection)
    n.router.Add(peer)
    return peer
}

// Listen start listening on the given address and wait for new connection.
// Return error if error occurred while listening.
func (n *Node) Listen(addr string) error {

    listener, err := net.Listen(PROTOCOL, addr)
    if err != nil {
        return err
    }

    // Dispatch event on start listening
    n.events.Listening([]byte(addr))
    //wait until sentinel channel is closed to close listener
    go func(listener net.Listener) {
        <-n.sentinel
        err := listener.Close()
        if err != nil {
            log.Fatal(errors.Closing(err).Error())
        }
    }(listener)

    for {
        // Block/Hold while waiting for new incoming connection
        // Synchronized incoming connections
        conn, err := listener.Accept()
        // If connection is closed
        if n.Closed() {
            // Graceful stop listening
            return nil
        }

        if err != nil {
            log.Fatal(errors.Binding(err).Error())
            return err
        }

        peer := n.routing(conn) // Routing for connection
        go n.watch(peer)        // Wait for incoming messages
        // Dispatch event for new peer connected
        payload := []byte(peer.Socket())
        n.events.PeerConnected(payload)
    }

}

// Table return current routing table
func (n *Node) Table() Table {
    return n.router.Table()
}

// Closed check connection state.
// Return true for connection open else false
func (n *Node) Closed() bool {
    select {
    case <-n.sentinel:
        return true
    default:
        return false
    }
}

// Close all peers connections and stop listening
func (n *Node) Close() {
    for _, peer := range n.router.Table() {
        go func(p *Peer) {
            if err := p.Close(); err != nil {
                log.Fatal(errors.Closing(err).Error())
            }
        }(peer)
    }

    // Dispatch event on node get closed
    n.events.ClosedConnection()
    // If channel get closed then all routines waiting for connections
    // or waiting for incoming messages get closed too.
    close(n.sentinel)
}

// Dial attempt to connect to remote node and add connected peer to routing table.
// Return error if error occurred while dialing node.
func (n *Node) Dial(addr string) error {
    conn, err := net.Dial(PROTOCOL, addr)
    if err != nil {
        return errors.Dialing(err, addr)
    }

    peer := n.routing(conn) // Routing for connection
    go n.watch(peer)        // Wait for incoming messages
    // Dispatch event for new peer connected
    n.events.PeerConnected([]byte(peer.Socket()))
    return nil
}

//Copyright (c) 2022, Geolffrey Mena <[email protected]>

//P2P Noise Secure handshake.
//
//See also: http://www.noiseprotocol.org/noise.html#introduction
package main

import (
    "context"
    "log"

    noise "github.com/geolffreym/p2p-noise"
)

func main() {
    node := noise.NewNode()
    // Network events channel
    ctx, cancel := context.WithCancel(context.Background())
    events := node.Events(ctx)

    go func() {
        for msg := range events {
            log.Printf("Listening on: %s \n", msg.Payload())
            cancel() // stop listening for events
        }
    }()

    // ... some code here
    // node.Dial("192.168.1.1:4008")
    // node.Close()

    // ... more code here
    node.Listen("127.0.0.1:4008")

}

deleted 75 characters in body
Source Link
Geo
  • 21
  • 3
listenAddr//Copyright :=(c) "127.0.02022, Geolffrey Mena <gmjun2000@gmail.1:4007"com>
localNode
//P2P :=Noise nodeSecure handshake.NewNode()
//
//See Interceptalso: internalhttp://www.noiseprotocol.org/noise.html#introduction
package messagesmain
localNode.Observe(func
import (msg 
 network.Message) {
  "context"
  switch msg.Type() {"log"

    casenoise network"github.SELF_LISTENING:com/geolffreym/p2p-noise"
)

func main() {
     
  fmt.Printf("Listening on: %snode \n",:= msgnoise.PayloadNewNode())
    case// network.NEWPEER_DETECTED:
Network events channel
    ctx, cancel := fmtcontext.PrintfWithCancel("Newcontext.Background())
 peer connected: %s \n",events msg:= node.PayloadEvents()ctx) 

    casego network.CLOSED_CONNECTION:
func() {
       fmt.Print("Closed connection")
for msg := range caseevents network.MESSAGE_RECEIVED:{
        fmt    log.Printf("New message"Listening receivedon: %s \n", msg.Payload())
    case network.PEER_DISCONNECTED:
        fmt.Printfcancel("Peer) disconnected:// %sstop \n",listening msg.Payload())for events
    default:

    }
    }()

    // .......

// closesome connection
localNode.Close()

//code orhere
 dial to other peer
localNodenode.Dial("192.168.1.101:3000"4008")
    node.Close()

    // .......
  more code here
// listen for incoming connections
localNodenode.Listen(listenAddr"127.0.0.1:4008")

}

listenAddr := "127.0.0.1:4007"
localNode := node.NewNode()

// Intercept internal messages
localNode.Observe(func(msg network.Message) {
    switch msg.Type() {
    case network.SELF_LISTENING:
        fmt.Printf("Listening on: %s \n", msg.Payload())
    case network.NEWPEER_DETECTED:
        fmt.Printf("New peer connected: %s \n", msg.Payload())
    case network.CLOSED_CONNECTION:
        fmt.Print("Closed connection")
    case network.MESSAGE_RECEIVED:
        fmt.Printf("New message received: %s \n", msg.Payload())
    case network.PEER_DISCONNECTED:
        fmt.Printf("Peer disconnected: %s \n", msg.Payload())
    default:

    }
})

// .......

// close connection
localNode.Close()

// or dial to other peer
localNode.Dial("192.168.1.10:3000")

// .......
 
// listen for incoming connections
localNode.Listen(listenAddr)
//Copyright (c) 2022, Geolffrey Mena <gmjun2000@gmail.com>

//P2P Noise Secure handshake.
//
//See also: http://www.noiseprotocol.org/noise.html#introduction
package main

import ( 
    "context"
    "log"

    noise "github.com/geolffreym/p2p-noise"
)

func main() {
     
    node := noise.NewNode()
    // Network events channel
    ctx, cancel := context.WithCancel(context.Background())
    events := node.Events(ctx) 

    go func() {
        for msg := range events {
            log.Printf("Listening on: %s \n", msg.Payload())
            cancel() // stop listening for events
        }
    }()

    // ... some code here
    node.Dial("192.168.1.1:4008")
    node.Close()

    // ... more code here
    node.Listen("127.0.0.1:4008")

}

updates to code since post creation
Source Link
Geo
  • 21
  • 3
Loading
added 6 characters in body
Source Link
Geo
  • 21
  • 3
Loading
deleted 15 characters in body
Source Link
Geo
  • 21
  • 3
Loading
added 13 characters in body
Source Link
Geo
  • 21
  • 3
Loading
added 829 characters in body
Source Link
Geo
  • 21
  • 3
Loading
added 829 characters in body
Source Link
Geo
  • 21
  • 3
Loading
added 46 characters in body; edited tags; edited title
Source Link
200_success
  • 145.7k
  • 22
  • 191
  • 481
Loading
deleted 19 characters in body
Source Link
Geo
  • 21
  • 3
Loading
changes made to code after post
Source Link
Geo
  • 21
  • 3
Loading
edited title
Link
Geo
  • 21
  • 3
Loading
edited title
Link
Geo
  • 21
  • 3
Loading
deleted 15 characters in body
Source Link
Geo
  • 21
  • 3
Loading
added 1 character in body
Source Link
Geo
  • 21
  • 3
Loading
deleted 37 characters in body
Source Link
Geo
  • 21
  • 3
Loading
added 33 characters in body
Source Link
Geo
  • 21
  • 3
Loading
edited body
Source Link
Geo
  • 21
  • 3
Loading
deleted 27 characters in body
Source Link
Geo
  • 21
  • 3
Loading
Source Link
Geo
  • 21
  • 3
Loading