summaryrefslogtreecommitdiffhomepage
path: root/src/peer.go
blob: fadc43f5ea99d0a3f4d2882c1df934194d27de82 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package main

import (
	"errors"
	"net"
	"sync"
	"time"
)

const ()

type Peer struct {
	id                          uint
	mutex                       sync.RWMutex
	endpoint                    *net.UDPAddr
	persistentKeepaliveInterval uint64
	keyPairs                    KeyPairs
	handshake                   Handshake
	device                      *Device
	txBytes                     uint64
	rxBytes                     uint64
	time                        struct {
		lastSend      time.Time // last send message
		lastHandshake time.Time // last completed handshake
	}
	signal struct {
		newKeyPair         chan struct{} // (size 1) : a new key pair was generated
		handshakeBegin     chan struct{} // (size 1) : request that a new handshake be started ("queue handshake")
		handshakeCompleted chan struct{} // (size 1) : handshake completed
		flushNonceQueue    chan struct{} // (size 1) : empty queued packets
		stop               chan struct{} // (size 0) : close to stop all goroutines for peer
	}
	timer struct {
		sendKeepalive    *time.Timer
		handshakeTimeout *time.Timer
	}
	queue struct {
		nonce    chan *QueueOutboundElement // nonce / pre-handshake queue
		outbound chan *QueueOutboundElement // sequential ordering of work
		inbound  chan *QueueInboundElement  // sequential ordering of work
	}
	mac MACStatePeer
}

func (device *Device) NewPeer(pk NoisePublicKey) *Peer {
	// create peer

	peer := new(Peer)
	peer.mutex.Lock()
	defer peer.mutex.Unlock()

	peer.mac.Init(pk)
	peer.device = device
	peer.timer.sendKeepalive = stoppedTimer()

	// assign id for debugging

	device.mutex.Lock()
	peer.id = device.idCounter
	device.idCounter += 1

	// map public key

	_, ok := device.peers[pk]
	if ok {
		panic(errors.New("bug: adding existing peer"))
	}
	device.peers[pk] = peer
	device.mutex.Unlock()

	// precompute DH

	handshake := &peer.handshake
	handshake.mutex.Lock()
	handshake.remoteStatic = pk
	handshake.precomputedStaticStatic = device.privateKey.sharedSecret(handshake.remoteStatic)
	handshake.mutex.Unlock()

	// prepare queuing

	peer.queue.nonce = make(chan *QueueOutboundElement, QueueOutboundSize)
	peer.queue.outbound = make(chan *QueueOutboundElement, QueueOutboundSize)
	peer.queue.inbound = make(chan *QueueInboundElement, QueueInboundSize)

	// prepare signaling

	peer.signal.stop = make(chan struct{})
	peer.signal.newKeyPair = make(chan struct{}, 1)
	peer.signal.handshakeBegin = make(chan struct{}, 1)
	peer.signal.handshakeCompleted = make(chan struct{}, 1)
	peer.signal.flushNonceQueue = make(chan struct{}, 1)

	// outbound pipeline

	go peer.RoutineNonce()
	go peer.RoutineHandshakeInitiator()
	go peer.RoutineSequentialSender()
	go peer.RoutineSequentialReceiver()

	return peer
}

func (peer *Peer) Close() {
	close(peer.signal.stop)
}