Skip to content
This repository has been archived by the owner on Feb 24, 2024. It is now read-only.

Commit

Permalink
Merge pull request #69 from hyprspace/stream-reuse
Browse files Browse the repository at this point in the history
Improve Performance by Reusing Libp2p Streams
  • Loading branch information
alecbcs authored Feb 1, 2022
2 parents 4b1b189 + 275eb18 commit dcbdbe3
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 26 deletions.
34 changes: 32 additions & 2 deletions cli/down.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package cli

import (
"fmt"
"os"
"path/filepath"
"strconv"

"github.com/DataDrake/cli-ng/v2/cmd"
"github.com/hyprspace/hyprspace/tun"
Expand All @@ -26,7 +29,34 @@ func DownRun(r *cmd.Root, c *cmd.Sub) {
// Parse Command Args
args := c.Args.(*DownArgs)

fmt.Println("[+] ip link delete dev " + args.InterfaceName)
err := tun.Delete(args.InterfaceName)
// Parse Global Config Flag for Custom Config Path
configPath := r.Flags.(*GlobalFlags).Config
if configPath == "" {
configPath = "/etc/hyprspace/" + args.InterfaceName + ".yaml"
}

// Read lock from file system to stop process.
lockPath := filepath.Join(filepath.Dir(configPath), args.InterfaceName+".lock")
out, err := os.ReadFile(lockPath)
checkErr(err)

pid, err := strconv.Atoi(string(out))
checkErr(err)

process, err := os.FindProcess(pid)
checkErr(err)

err0 := process.Signal(os.Interrupt)

err1 := tun.Delete(args.InterfaceName)

// Different types of systems may need the tun devices destroyed first or
// the process to exit first don't worry as long as one of these two has
// suceeded.
if err0 != nil && err1 != nil {
checkErr(err0)
checkErr(err1)
}

fmt.Println("[+] deleted hyprspace " + args.InterfaceName + " daemon")
}
73 changes: 54 additions & 19 deletions cli/up.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ import (
"context"
"errors"
"fmt"
"io"
"log"
"net"
"os"
"os/signal"
"path/filepath"
"runtime"
"strconv"
"strings"
Expand All @@ -23,7 +23,6 @@ import (
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"golang.org/x/net/ipv4"
)

var (
Expand All @@ -32,7 +31,9 @@ var (
tunDev *tun.TUN
// RevLookup allow quick lookups of an incoming stream
// for security before accepting or responding to any data.
RevLookup map[string]bool
RevLookup map[string]string
// activeStreams is a map of active streams to a peer
activeStreams map[string]network.Stream
)

// Up creates and brings up a Hyprspace Interface.
Expand Down Expand Up @@ -91,9 +92,9 @@ func UpRun(r *cmd.Root, c *cmd.Sub) {
}

// Setup reverse lookup hash map for authentication.
RevLookup = make(map[string]bool, len(cfg.Peers))
for _, id := range cfg.Peers {
RevLookup[id.ID] = true
RevLookup = make(map[string]string, len(cfg.Peers))
for ip, id := range cfg.Peers {
RevLookup[id.ID] = ip
}

fmt.Println("[+] Creating TUN Device")
Expand Down Expand Up @@ -158,6 +159,9 @@ func UpRun(r *cmd.Root, c *cmd.Sub) {
go p2p.Discover(ctx, host, dht, peerTable)
go prettyDiscovery(ctx, host, peerTable)

// Configure path for lock
lockPath := filepath.Join(filepath.Dir(cfg.Path), cfg.Interface.Name+".lock")

go func() {
// Wait for a SIGINT or SIGTERM signal
ch := make(chan os.Signal, 1)
Expand All @@ -169,9 +173,19 @@ func UpRun(r *cmd.Root, c *cmd.Sub) {
if err := host.Close(); err != nil {
panic(err)
}

// Remove daemon lock from file system.
err = os.Remove(lockPath)
checkErr(err)

// Exit the application.
os.Exit(0)
}()

// Write lock to filesystem to indicate an existing running daemon.
err = os.WriteFile(lockPath, []byte(fmt.Sprint(os.Getpid())), os.ModePerm)
checkErr(err)

// Bring Up TUN Device
err = tunDev.Up()
if err != nil {
Expand All @@ -180,28 +194,38 @@ func UpRun(r *cmd.Root, c *cmd.Sub) {

fmt.Println("[+] Network Setup Complete...Waiting on Node Discovery")
// Listen For New Packets on TUN Interface
packet := make([]byte, 1420)
var stream network.Stream
var header *ipv4.Header
var plen int
activeStreams = make(map[string]network.Stream)
var packet = make([]byte, 1420)
for {
plen, err = tunDev.Iface.Read(packet)
checkErr(err)
header, _ = ipv4.ParseHeader(packet)
_, ok := cfg.Peers[header.Dst.String()]
plen, err := tunDev.Iface.Read(packet)
if err != nil {
log.Println(err)
continue
}
dst := net.IPv4(packet[16], packet[17], packet[18], packet[19]).String()
stream, ok := activeStreams[dst]
if ok {
stream, err = host.NewStream(ctx, peerTable[header.Dst.String()], p2p.Protocol)
_, err = stream.Write(packet[:plen])
if err == nil {
continue
}
stream.Close()
delete(activeStreams, dst)
ok = false
}
if peer, ok := peerTable[dst]; ok {
stream, err = host.NewStream(ctx, peer, p2p.Protocol)
if err != nil {
log.Println(err)
continue
}
stream.Write(packet[:plen])
stream.Close()
activeStreams[dst] = stream
}
}
}

func createDaemon(cfg config.Config, out chan<- error) {
func createDaemon(cfg *config.Config, out chan<- error) {
path, err := os.Executable()
checkErr(err)
// Create Pipe to monitor for daemon output.
Expand All @@ -224,6 +248,8 @@ func createDaemon(cfg config.Config, out chan<- error) {
count++
}
}

// Release the created daemon
err = process.Release()
checkErr(err)
if count < len(cfg.Peers) {
Expand All @@ -236,9 +262,18 @@ func streamHandler(stream network.Stream) {
// If the remote node ID isn't in the list of known nodes don't respond.
if _, ok := RevLookup[stream.Conn().RemotePeer().Pretty()]; !ok {
stream.Reset()
return
}
var packet = make([]byte, 1420)
for {
plen, err := stream.Read(packet)
if err != nil {
stream.Close()
delete(activeStreams, RevLookup[stream.Conn().RemotePeer().Pretty()])
return
}
tunDev.Iface.Write(packet[:plen])
}
io.Copy(tunDev.Iface.ReadWriteCloser, stream)
stream.Close()
}

func prettyDiscovery(ctx context.Context, node host.Host, peerTable map[string]peer.ID) {
Expand Down
17 changes: 13 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

// Config is the main Configuration Struct for Hyprspace.
type Config struct {
Path string `yaml:"path,omitempty"`
Interface Interface `yaml:"interface"`
Peers map[string]Peer `yaml:"peers"`
}
Expand All @@ -27,12 +28,12 @@ type Peer struct {
}

// Read initializes a config from a file.
func Read(path string) (result Config, err error) {
func Read(path string) (*Config, error) {
in, err := os.ReadFile(path)
if err != nil {
return
return nil, err
}
result = Config{
result := Config{
Interface: Interface{
Name: "hs0",
ListenPort: 8001,
Expand All @@ -41,6 +42,14 @@ func Read(path string) (result Config, err error) {
PrivateKey: "",
},
}

// Read in config settings from file.
err = yaml.Unmarshal(in, &result)
return
if err != nil {
return nil, err
}

// Overwrite path of config to input.
result.Path = path
return &result, nil
}
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ require (
github.com/songgao/water v0.0.0-20200317203138-2b4b6d7c09d8
github.com/tcnksm/go-latest v0.0.0-20170313132115-e3007ae9052e
github.com/vishvananda/netlink v1.1.0
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781
golang.org/x/sys v0.0.0-20210603125802-9665404d3644 // indirect
gopkg.in/yaml.v2 v2.4.0
honnef.co/go/tools v0.0.1-2020.1.4 // indirect
Expand Down

0 comments on commit dcbdbe3

Please sign in to comment.