Skip to content

Commit

Permalink
Initial IPv6 support (#106)
Browse files Browse the repository at this point in the history
  • Loading branch information
streamer45 authored Jun 15, 2023
1 parent 21c6eae commit abfdcea
Show file tree
Hide file tree
Showing 13 changed files with 218 additions and 89 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,5 @@ jobs:
go mod download
go mod verify
make test
env:
CI: true
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
# Variables

## General Variables

# CI
CI ?= false

# Branch Variables
PROTECTED_BRANCH := master
CURRENT_BRANCH := $(shell git rev-parse --abbrev-ref HEAD)
Expand Down Expand Up @@ -290,6 +294,7 @@ go-test: ## to run tests
$(AT)$(DOCKER) run ${DOCKER_OPTS} \
-v $(PWD):/app -w /app \
-e GOCACHE="/tmp" \
-e CI=${CI} \
$(DOCKER_IMAGE_GO) \
/bin/sh -c \
"cd /app && \
Expand Down
4 changes: 4 additions & 0 deletions config/config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ ice_address_tcp = ""
# The TCP port used to route media (audio/screen/video tracks). This is used to
# generate TCP candidates.
ice_port_tcp = 8443
# Enables experimental IPv6 support. When this setting is true the RTC service
# will work in dual-stack mode, listening for IPv6 connections and generating
# candidates in addition to IPv4 ones.
enable_ipv6 = false
# An optional hostname used to override the default value. By default, the
# service will try to guess its own public IP through STUN (if configured).
#
Expand Down
2 changes: 2 additions & 0 deletions service/rtc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type ServerConfig struct {
// A list of ICE server (STUN/TURN) configurations to use.
ICEServers ICEServers `toml:"ice_servers"`
TURNConfig TURNConfig `toml:"turn"`
// EnableIPv6 specifies whether or not IPv6 should be used.
EnableIPv6 bool `toml:"enable_ipv6"`
}

func (c ServerConfig) IsValid() error {
Expand Down
40 changes: 26 additions & 14 deletions service/rtc/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"fmt"
"net"
"net/netip"
"runtime"
"syscall"
"time"
Expand All @@ -22,9 +23,9 @@ const (
tcpSocketWriteBufferSize = 1024 * 1024 * 4 // 4MB
)

// getSystemIPs returns a list of all the available IPv4 addresses.
func getSystemIPs(log mlog.LoggerIFace) ([]string, error) {
var ips []string
// getSystemIPs returns a list of all the available local addresses.
func getSystemIPs(log mlog.LoggerIFace, dualStack bool) ([]netip.Addr, error) {
var ips []netip.Addr

interfaces, err := net.Interfaces()
if err != nil {
Expand All @@ -34,36 +35,43 @@ func getSystemIPs(log mlog.LoggerIFace) ([]string, error) {
for _, iface := range interfaces {
// filter out inactive interfaces
if iface.Flags&net.FlagUp == 0 {
log.Debug("skipping inactive interface", mlog.String("interface", iface.Name))
log.Info("skipping inactive interface", mlog.String("interface", iface.Name))
continue
}

addrs, err := iface.Addrs()
if err != nil {
log.Debug("failed to get addresses for interface", mlog.String("interface", iface.Name))
log.Warn("failed to get addresses for interface", mlog.String("interface", iface.Name))
continue
}

for _, addr := range addrs {
ip, _, err := net.ParseCIDR(addr.String())
prefix, err := netip.ParsePrefix(addr.String())
if err != nil {
log.Debug("failed to parse address", mlog.Err(err), mlog.String("addr", addr.String()))
log.Warn("failed to parse prefix", mlog.Err(err), mlog.String("prefix", prefix.String()))
continue
}

// IPv4 only (for the time being at least, see MM-50294)
if ip.To4() == nil {
ip := prefix.Addr()

if !dualStack && ip.Is6() {
log.Debug("ignoring IPv6 address: dual stack support is disabled by config", mlog.String("addr", ip.String()))
continue
}

if ip.Is6() && !ip.IsGlobalUnicast() {
log.Debug("ignoring non global IPv6 address", mlog.String("addr", ip.String()))
continue
}

ips = append(ips, ip.String())
ips = append(ips, ip)
}
}

return ips, nil
}

func createUDPConnsForAddr(log mlog.LoggerIFace, listenAddress string) ([]net.PacketConn, error) {
func createUDPConnsForAddr(log mlog.LoggerIFace, network, listenAddress string) ([]net.PacketConn, error) {
var conns []net.PacketConn

for i := 0; i < runtime.NumCPU(); i++ {
Expand All @@ -84,7 +92,7 @@ func createUDPConnsForAddr(log mlog.LoggerIFace, listenAddress string) ([]net.Pa
},
}

udpConn, err := listenConfig.ListenPacket(context.Background(), "udp4", listenAddress)
udpConn, err := listenConfig.ListenPacket(context.Background(), network, listenAddress)
if err != nil {
return nil, fmt.Errorf("failed to listen on udp: %w", err)
}
Expand Down Expand Up @@ -132,12 +140,12 @@ func createUDPConnsForAddr(log mlog.LoggerIFace, listenAddress string) ([]net.Pa
return conns, nil
}

func resolveHost(host string, timeout time.Duration) (string, error) {
func resolveHost(host, network string, timeout time.Duration) (string, error) {
var ip string
r := net.Resolver{}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
addrs, err := r.LookupIP(ctx, "ip4", host)
addrs, err := r.LookupIP(ctx, network, host)
if err != nil {
return ip, fmt.Errorf("failed to resolve host %q: %w", host, err)
}
Expand All @@ -146,3 +154,7 @@ func resolveHost(host string, timeout time.Duration) (string, error) {
}
return ip, err
}

func areAddressesSameStack(addrA, addrB netip.Addr) bool {
return (addrA.Is4() && addrB.Is4()) || (addrA.Is6() && addrB.Is6())
}
79 changes: 67 additions & 12 deletions service/rtc/net_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package rtc

import (
"net/netip"
"os"
"runtime"
"testing"

Expand All @@ -20,9 +22,40 @@ func TestGetSystemIPs(t *testing.T) {
require.NoError(t, err)
}()

ips, err := getSystemIPs(log)
require.NoError(t, err)
require.NotEmpty(t, ips)
t.Run("ipv4", func(t *testing.T) {
ips, err := getSystemIPs(log, false)
require.NoError(t, err)
require.NotEmpty(t, ips)

for _, ip := range ips {
require.True(t, ip.Is4())
}
})

t.Run("dual stack", func(t *testing.T) {
// Skipping this test in CI since IPv6 is not yet supported by Github actions.
if os.Getenv("CI") != "" {
t.Skip()
}

ips, err := getSystemIPs(log, true)
require.NoError(t, err)
require.NotEmpty(t, ips)

var hasIPv4 bool
var hasIPv6 bool
for _, ip := range ips {
if ip.Is4() {
hasIPv4 = true
}
if ip.Is6() {
hasIPv6 = true
}
}

require.True(t, hasIPv4)
require.True(t, hasIPv6)
})
}

func TestCreateUDPConnsForAddr(t *testing.T) {
Expand All @@ -33,16 +66,38 @@ func TestCreateUDPConnsForAddr(t *testing.T) {
require.NoError(t, err)
}()

ips, err := getSystemIPs(log)
require.NoError(t, err)
require.NotEmpty(t, ips)
t.Run("IPv4", func(t *testing.T) {
ips, err := getSystemIPs(log, false)
require.NoError(t, err)
require.NotEmpty(t, ips)

for _, ip := range ips {
conns, err := createUDPConnsForAddr(log, ip+":30443")
for _, ip := range ips {
conns, err := createUDPConnsForAddr(log, "udp4", netip.AddrPortFrom(ip, 30443).String())
require.NoError(t, err)
require.Len(t, conns, runtime.NumCPU())
for _, conn := range conns {
require.NoError(t, conn.Close())
}
}
})

t.Run("dual stack", func(t *testing.T) {
// Skipping this test in CI since IPv6 is not yet supported by Github actions.
if os.Getenv("CI") != "" {
t.Skip()
}

ips, err := getSystemIPs(log, false)
require.NoError(t, err)
require.Len(t, conns, runtime.NumCPU())
for _, conn := range conns {
require.NoError(t, conn.Close())
require.NotEmpty(t, ips)

for _, ip := range ips {
conns, err := createUDPConnsForAddr(log, "udp", netip.AddrPortFrom(ip, 30443).String())
require.NoError(t, err)
require.Len(t, conns, runtime.NumCPU())
for _, conn := range conns {
require.NoError(t, conn.Close())
}
}
}
})
}
46 changes: 28 additions & 18 deletions service/rtc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"encoding/json"
"fmt"
"net"
"net/netip"
"sync"
"time"

Expand All @@ -32,8 +33,8 @@ type Server struct {

udpMux ice.UDPMux
tcpMux ice.TCPMux
publicAddrsMap map[string]string
localIPs []string
publicAddrsMap map[netip.Addr]string
localIPs []netip.Addr

sendCh chan Message
receiveCh chan Message
Expand Down Expand Up @@ -63,7 +64,7 @@ func NewServer(cfg ServerConfig, log mlog.LoggerIFace, metrics Metrics) (*Server
sendCh: make(chan Message, msgChSize),
receiveCh: make(chan Message, msgChSize),
bufPool: &sync.Pool{New: func() interface{} { return make([]byte, receiveMTU) }},
publicAddrsMap: make(map[string]string),
publicAddrsMap: make(map[netip.Addr]string),
}

return s, nil
Expand All @@ -83,9 +84,16 @@ func (s *Server) ReceiveCh() <-chan Message {
}

func (s *Server) Start() error {
var err error
udpNetwork := "udp4"
tcpNetwork := "tcp4"

localIPs, err := getSystemIPs(s.log)
if s.cfg.EnableIPv6 {
s.log.Info("rtc: experimental IPv6 support enabled")
udpNetwork = "udp"
tcpNetwork = "tcp"
}

localIPs, err := getSystemIPs(s.log, s.cfg.EnableIPv6)
if err != nil {
return fmt.Errorf("failed to get system IPs: %w", err)
}
Expand All @@ -95,34 +103,36 @@ func (s *Server) Start() error {

s.localIPs = localIPs

s.log.Debug("rtc: found local IPs", mlog.Any("ips", s.localIPs))

// Populate public IP addresses map if override is not set and STUN is provided.
if s.cfg.ICEHostOverride == "" && len(s.cfg.ICEServers) > 0 {
for _, ip := range localIPs {
udpListenAddr := fmt.Sprintf("%s:%d", ip, s.cfg.ICEPortUDP)
udpAddr, err := net.ResolveUDPAddr("udp4", udpListenAddr)
udpListenAddr := netip.AddrPortFrom(ip, uint16(s.cfg.ICEPortUDP)).String()
udpAddr, err := net.ResolveUDPAddr(udpNetwork, udpListenAddr)
if err != nil {
s.log.Error("failed to resolve UDP address", mlog.Err(err))
continue
}

// TODO: consider making this logic concurrent to lower total time taken
// in case of multiple interfaces.
addr, err := getPublicIP(udpAddr, s.cfg.ICEServers.getSTUN())
addr, err := getPublicIP(udpAddr, udpNetwork, s.cfg.ICEServers.getSTUN())
if err != nil {
s.log.Warn("failed to get public IP address for local interface", mlog.String("localAddr", ip), mlog.Err(err))
s.log.Warn("failed to get public IP address for local interface", mlog.String("localAddr", ip.String()), mlog.Err(err))
} else {
s.log.Info("got public IP address for local interface", mlog.String("localAddr", ip), mlog.String("remoteAddr", addr))
s.log.Info("got public IP address for local interface", mlog.String("localAddr", ip.String()), mlog.String("remoteAddr", addr))
}

s.publicAddrsMap[ip] = addr
}
}

if err := s.initUDP(localIPs); err != nil {
if err := s.initUDP(localIPs, udpNetwork); err != nil {
return err
}

if err := s.initTCP(); err != nil {
if err := s.initTCP(tcpNetwork); err != nil {
return err
}

Expand Down Expand Up @@ -291,11 +301,11 @@ func (s *Server) msgReader() {
}
}

func (s *Server) initUDP(localIPs []string) error {
func (s *Server) initUDP(localIPs []netip.Addr, network string) error {
var udpMuxes []ice.UDPMux

initUDPMux := func(addr string) error {
conns, err := createUDPConnsForAddr(s.log, addr)
conns, err := createUDPConnsForAddr(s.log, network, addr)
if err != nil {
return fmt.Errorf("failed to create UDP connections: %w", err)
}
Expand All @@ -315,7 +325,7 @@ func (s *Server) initUDP(localIPs []string) error {

// If an address is specified we create a single udp mux.
if s.cfg.ICEAddressUDP != "" {
if err := initUDPMux(fmt.Sprintf("%s:%d", s.cfg.ICEAddressUDP, s.cfg.ICEPortUDP)); err != nil {
if err := initUDPMux(net.JoinHostPort(s.cfg.ICEAddressUDP, fmt.Sprintf("%d", s.cfg.ICEPortUDP))); err != nil {
return err
}
s.udpMux = udpMuxes[0]
Expand All @@ -324,7 +334,7 @@ func (s *Server) initUDP(localIPs []string) error {

// If no address is specified we create a mux for each interface we find.
for _, ip := range localIPs {
if err := initUDPMux(fmt.Sprintf("%s:%d", ip, s.cfg.ICEPortUDP)); err != nil {
if err := initUDPMux(netip.AddrPortFrom(ip, uint16(s.cfg.ICEPortUDP)).String()); err != nil {
return err
}
}
Expand All @@ -334,8 +344,8 @@ func (s *Server) initUDP(localIPs []string) error {
return nil
}

func (s *Server) initTCP() error {
tcpListener, err := net.Listen("tcp4", fmt.Sprintf("%s:%d", s.cfg.ICEAddressTCP, s.cfg.ICEPortTCP))
func (s *Server) initTCP(network string) error {
tcpListener, err := net.Listen(network, net.JoinHostPort(s.cfg.ICEAddressTCP, fmt.Sprintf("%d", s.cfg.ICEPortTCP)))
if err != nil {
return fmt.Errorf("failed to create TCP listener: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion service/rtc/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func TestStartServer(t *testing.T) {
require.NoError(t, err)
defer udpConn.Close()

ips, err := getSystemIPs(log)
ips, err := getSystemIPs(log, false)
require.NoError(t, err)
require.NotEmpty(t, ips)

Expand Down
Loading

0 comments on commit abfdcea

Please sign in to comment.