From 9fd98c97defdcd39497c9d78925425cf7b9db053 Mon Sep 17 00:00:00 2001 From: Mikolaj Walczak Date: Mon, 13 Aug 2018 05:28:42 -0700 Subject: Require sudo for integration tests --- .travis.yml | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 142bea8..3876f7c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,6 @@ language: go -sudo: false +sudo: required go: - "1.8" @@ -11,6 +11,11 @@ go: before_install: - go get -t -v ./... +before_script: + - if [ "${TRAVIS_OS_NAME}" == "linux" ]; then + sudo sh -c 'echo 0 > /proc/sys/net/ipv6/conf/all/disable_ipv6'; + fi + script: - ./.travis/tests.sh -- cgit v1.2.3 From 1c12a97ec2674a36612ddb4a4a627d407ee38b46 Mon Sep 17 00:00:00 2001 From: Mikolaj Walczak Date: Mon, 13 Aug 2018 05:04:49 -0700 Subject: Move DHCPv6 async to seperate package --- dhcpv6/async.go | 235 -------------------------------------------- dhcpv6/async/client.go | 226 ++++++++++++++++++++++++++++++++++++++++++ dhcpv6/async/client_test.go | 122 +++++++++++++++++++++++ dhcpv6/async_test.go | 54 ---------- dhcpv6/client.go | 4 +- dhcpv6/future.go | 111 --------------------- dhcpv6/future_test.go | 170 -------------------------------- 7 files changed, 350 insertions(+), 572 deletions(-) delete mode 100644 dhcpv6/async.go create mode 100644 dhcpv6/async/client.go create mode 100644 dhcpv6/async/client_test.go delete mode 100644 dhcpv6/async_test.go delete mode 100644 dhcpv6/future.go delete mode 100644 dhcpv6/future_test.go diff --git a/dhcpv6/async.go b/dhcpv6/async.go deleted file mode 100644 index e9930bf..0000000 --- a/dhcpv6/async.go +++ /dev/null @@ -1,235 +0,0 @@ -package dhcpv6 - -import ( - "context" - "fmt" - "net" - "sync" - "time" -) - -// AsyncClient implements an asynchronous DHCPv6 client -type AsyncClient struct { - ReadTimeout time.Duration - WriteTimeout time.Duration - LocalAddr net.Addr - RemoteAddr net.Addr - IgnoreErrors bool - - connection *net.UDPConn - cancel context.CancelFunc - stopping *sync.WaitGroup - receiveQueue chan DHCPv6 - sendQueue chan DHCPv6 - packetsLock sync.Mutex - packets map[uint32](chan Response) - errors chan error -} - -// NewAsyncClient creates an asynchronous client -func NewAsyncClient() *AsyncClient { - return &AsyncClient{ - ReadTimeout: DefaultReadTimeout, - WriteTimeout: DefaultWriteTimeout, - } -} - -// OpenForInterface starts the client on the specified interface, replacing -// client LocalAddr with a link-local address of the given interface and -// standard DHCP port (546). -func (c *AsyncClient) OpenForInterface(ifname string, bufferSize int) error { - addr, err := GetLinkLocalAddr(ifname) - if err != nil { - return err - } - c.LocalAddr = &net.UDPAddr{IP: *addr, Port: DefaultClientPort, Zone: ifname} - return c.Open(bufferSize) -} - -// Open starts the client -func (c *AsyncClient) Open(bufferSize int) error { - var ( - addr *net.UDPAddr - ok bool - err error - ) - - if addr, ok = c.LocalAddr.(*net.UDPAddr); !ok { - return fmt.Errorf("Invalid local address: %v not a net.UDPAddr", c.LocalAddr) - } - - // prepare the socket to listen on for replies - c.connection, err = net.ListenUDP("udp6", addr) - if err != nil { - return err - } - c.stopping = new(sync.WaitGroup) - c.sendQueue = make(chan DHCPv6, bufferSize) - c.receiveQueue = make(chan DHCPv6, bufferSize) - c.packets = make(map[uint32](chan Response)) - c.packetsLock = sync.Mutex{} - c.errors = make(chan error) - - var ctx context.Context - ctx, c.cancel = context.WithCancel(context.Background()) - go c.receiverLoop(ctx) - go c.senderLoop(ctx) - - return nil -} - -// Close stops the client -func (c *AsyncClient) Close() { - // Wait for sender and receiver loops - c.stopping.Add(2) - c.cancel() - c.stopping.Wait() - - close(c.sendQueue) - close(c.receiveQueue) - close(c.errors) - - c.connection.Close() -} - -// Errors returns a channel where runtime errors are posted -func (c *AsyncClient) Errors() <-chan error { - return c.errors -} - -func (c *AsyncClient) addError(err error) { - if !c.IgnoreErrors { - c.errors <- err - } -} - -func (c *AsyncClient) receiverLoop(ctx context.Context) { - defer func() { c.stopping.Done() }() - for { - select { - case <-ctx.Done(): - return - case packet := <-c.receiveQueue: - c.receive(packet) - } - } -} - -func (c *AsyncClient) senderLoop(ctx context.Context) { - defer func() { c.stopping.Done() }() - for { - select { - case <-ctx.Done(): - return - case packet := <-c.sendQueue: - c.send(packet) - } - } -} - -func (c *AsyncClient) send(packet DHCPv6) { - transactionID, err := GetTransactionID(packet) - if err != nil { - c.addError(fmt.Errorf("Warning: This should never happen, there is no transaction ID on %s", packet)) - return - } - c.packetsLock.Lock() - f := c.packets[transactionID] - c.packetsLock.Unlock() - - raddr, err := c.remoteAddr() - if err != nil { - f <- NewResponse(nil, err) - return - } - - c.connection.SetWriteDeadline(time.Now().Add(c.WriteTimeout)) - _, err = c.connection.WriteTo(packet.ToBytes(), raddr) - if err != nil { - f <- NewResponse(nil, err) - return - } - - c.receiveQueue <- packet -} - -func (c *AsyncClient) receive(_ DHCPv6) { - var ( - oobdata = []byte{} - received DHCPv6 - ) - - c.connection.SetReadDeadline(time.Now().Add(c.ReadTimeout)) - for { - buffer := make([]byte, maxUDPReceivedPacketSize) - n, _, _, _, err := c.connection.ReadMsgUDP(buffer, oobdata) - if err != nil { - if err, ok := err.(net.Error); !ok || !err.Timeout() { - c.addError(fmt.Errorf("Error receiving the message: %s", err)) - } - return - } - received, err = FromBytes(buffer[:n]) - if err != nil { - // skip non-DHCP packets - continue - } - break - } - - transactionID, err := GetTransactionID(received) - if err != nil { - c.addError(fmt.Errorf("Unable to get a transactionID for %s: %s", received, err)) - return - } - - c.packetsLock.Lock() - if f, ok := c.packets[transactionID]; ok { - delete(c.packets, transactionID) - f <- NewResponse(received, nil) - } - c.packetsLock.Unlock() -} - -func (c *AsyncClient) remoteAddr() (*net.UDPAddr, error) { - if c.RemoteAddr == nil { - return &net.UDPAddr{IP: AllDHCPRelayAgentsAndServers, Port: DefaultServerPort}, nil - } - - if addr, ok := c.RemoteAddr.(*net.UDPAddr); ok { - return addr, nil - } - return nil, fmt.Errorf("Invalid remote address: %v not a net.UDPAddr", c.RemoteAddr) -} - -// Send inserts a message to the queue to be sent asynchronously. -// Returns a future which resolves to response and error. -func (c *AsyncClient) Send(message DHCPv6, modifiers ...Modifier) Future { - for _, mod := range modifiers { - message = mod(message) - } - - transactionID, err := GetTransactionID(message) - if err != nil { - return NewFailureFuture(err) - } - - f := NewFuture() - c.packetsLock.Lock() - c.packets[transactionID] = f - c.packetsLock.Unlock() - c.sendQueue <- message - return f -} - -// Exchange executes asynchronously a 4-way DHCPv6 request (SOLICIT, -// ADVERTISE, REQUEST, REPLY). -func (c *AsyncClient) Exchange(solicit DHCPv6, modifiers ...Modifier) Future { - return c.Send(solicit).OnSuccess(func(advertise DHCPv6) Future { - request, err := NewRequestFromAdvertise(advertise) - if err != nil { - return NewFailureFuture(err) - } - return c.Send(request, modifiers...) - }) -} diff --git a/dhcpv6/async/client.go b/dhcpv6/async/client.go new file mode 100644 index 0000000..08c2cfb --- /dev/null +++ b/dhcpv6/async/client.go @@ -0,0 +1,226 @@ +package async + +import ( + "context" + "fmt" + "net" + "sync" + "time" + + "github.com/fanliao/go-promise" + "github.com/insomniacslk/dhcp/dhcpv6" +) + +// Client implements an asynchronous DHCPv6 client +type Client struct { + ReadTimeout time.Duration + WriteTimeout time.Duration + LocalAddr net.Addr + RemoteAddr net.Addr + IgnoreErrors bool + + connection *net.UDPConn + cancel context.CancelFunc + stopping *sync.WaitGroup + receiveQueue chan dhcpv6.DHCPv6 + sendQueue chan dhcpv6.DHCPv6 + packetsLock sync.Mutex + packets map[uint32]*promise.Promise + errors chan error +} + +// NewClient creates an asynchronous client +func NewClient() *Client { + return &Client{ + ReadTimeout: dhcpv6.DefaultReadTimeout, + WriteTimeout: dhcpv6.DefaultWriteTimeout, + } +} + +// OpenForInterface starts the client on the specified interface, replacing +// client LocalAddr with a link-local address of the given interface and +// standard DHCP port (546). +func (c *Client) OpenForInterface(ifname string, bufferSize int) error { + addr, err := dhcpv6.GetLinkLocalAddr(ifname) + if err != nil { + return err + } + c.LocalAddr = &net.UDPAddr{IP: *addr, Port: dhcpv6.DefaultClientPort, Zone: ifname} + return c.Open(bufferSize) +} + +// Open starts the client +func (c *Client) Open(bufferSize int) error { + var ( + addr *net.UDPAddr + ok bool + err error + ) + + if addr, ok = c.LocalAddr.(*net.UDPAddr); !ok { + return fmt.Errorf("Invalid local address: %v not a net.UDPAddr", c.LocalAddr) + } + + // prepare the socket to listen on for replies + c.connection, err = net.ListenUDP("udp6", addr) + if err != nil { + return err + } + c.stopping = new(sync.WaitGroup) + c.sendQueue = make(chan dhcpv6.DHCPv6, bufferSize) + c.receiveQueue = make(chan dhcpv6.DHCPv6, bufferSize) + c.packets = make(map[uint32]*promise.Promise) + c.packetsLock = sync.Mutex{} + c.errors = make(chan error) + + var ctx context.Context + ctx, c.cancel = context.WithCancel(context.Background()) + go c.receiverLoop(ctx) + go c.senderLoop(ctx) + + return nil +} + +// Close stops the client +func (c *Client) Close() { + // Wait for sender and receiver loops + c.stopping.Add(2) + c.cancel() + c.stopping.Wait() + + close(c.sendQueue) + close(c.receiveQueue) + close(c.errors) + + c.connection.Close() +} + +// Errors returns a channel where runtime errors are posted +func (c *Client) Errors() <-chan error { + return c.errors +} + +func (c *Client) addError(err error) { + if !c.IgnoreErrors { + c.errors <- err + } +} + +func (c *Client) receiverLoop(ctx context.Context) { + defer func() { c.stopping.Done() }() + for { + select { + case <-ctx.Done(): + return + case packet := <-c.receiveQueue: + c.receive(packet) + } + } +} + +func (c *Client) senderLoop(ctx context.Context) { + defer func() { c.stopping.Done() }() + for { + select { + case <-ctx.Done(): + return + case packet := <-c.sendQueue: + c.send(packet) + } + } +} + +func (c *Client) send(packet dhcpv6.DHCPv6) { + transactionID, err := dhcpv6.GetTransactionID(packet) + if err != nil { + c.addError(fmt.Errorf("Warning: This should never happen, there is no transaction ID on %s", packet)) + return + } + c.packetsLock.Lock() + p := c.packets[transactionID] + c.packetsLock.Unlock() + + raddr, err := c.remoteAddr() + if err != nil { + p.Reject(err) + return + } + + c.connection.SetWriteDeadline(time.Now().Add(c.WriteTimeout)) + _, err = c.connection.WriteTo(packet.ToBytes(), raddr) + if err != nil { + p.Reject(err) + return + } + + c.receiveQueue <- packet +} + +func (c *Client) receive(_ dhcpv6.DHCPv6) { + var ( + oobdata = []byte{} + received dhcpv6.DHCPv6 + ) + + c.connection.SetReadDeadline(time.Now().Add(c.ReadTimeout)) + for { + buffer := make([]byte, dhcpv6.MaxUDPReceivedPacketSize) + n, _, _, _, err := c.connection.ReadMsgUDP(buffer, oobdata) + if err != nil { + if err, ok := err.(net.Error); !ok || !err.Timeout() { + c.addError(fmt.Errorf("Error receiving the message: %s", err)) + } + return + } + received, err = dhcpv6.FromBytes(buffer[:n]) + if err != nil { + // skip non-DHCP packets + continue + } + break + } + + transactionID, err := dhcpv6.GetTransactionID(received) + if err != nil { + c.addError(fmt.Errorf("Unable to get a transactionID for %s: %s", received, err)) + return + } + + c.packetsLock.Lock() + if p, ok := c.packets[transactionID]; ok { + delete(c.packets, transactionID) + p.Resolve(received) + } + c.packetsLock.Unlock() +} + +func (c *Client) remoteAddr() (*net.UDPAddr, error) { + if c.RemoteAddr == nil { + return &net.UDPAddr{IP: dhcpv6.AllDHCPRelayAgentsAndServers, Port: dhcpv6.DefaultServerPort}, nil + } + + if addr, ok := c.RemoteAddr.(*net.UDPAddr); ok { + return addr, nil + } + return nil, fmt.Errorf("Invalid remote address: %v not a net.UDPAddr", c.RemoteAddr) +} + +// Send inserts a message to the queue to be sent asynchronously. +// Returns a future which resolves to response and error. +func (c *Client) Send(message dhcpv6.DHCPv6, modifiers ...dhcpv6.Modifier) *promise.Future { + for _, mod := range modifiers { + message = mod(message) + } + + transactionID, err := dhcpv6.GetTransactionID(message) + if err != nil { + return promise.Wrap(err) + } + + p := promise.NewPromise() + c.packetsLock.Lock() + c.packets[transactionID] = p + c.packetsLock.Unlock() + c.sendQueue <- message + return p.Future +} diff --git a/dhcpv6/async/client_test.go b/dhcpv6/async/client_test.go new file mode 100644 index 0000000..8665589 --- /dev/null +++ b/dhcpv6/async/client_test.go @@ -0,0 +1,122 @@ +package async + +import ( + "net" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/insomniacslk/dhcp/dhcpv6" + "github.com/insomniacslk/dhcp/iana" +) + + +// solicit creates new solicit based on the mac address +func solicit(input string) (dhcpv6.DHCPv6, error) { + mac, err := net.ParseMAC(input) + if err != nil { + return nil, err + } + duid := dhcpv6.Duid{ + Type: dhcpv6.DUID_LLT, + HwType: iana.HwTypeEthernet, + Time: dhcpv6.GetTime(), + LinkLayerAddr: mac, + } + return dhcpv6.NewSolicitWithCID(duid) +} + +// server creates a server which responds with predefined answers +func serve(t *testing.T, addr *net.UDPAddr, responses ...dhcpv6.DHCPv6) { + conn, err := net.ListenUDP("udp6", addr) + require.NoError(t, err) + defer conn.Close() + oobdata := []byte{} + buffer := make([]byte, dhcpv6.MaxUDPReceivedPacketSize) + for _, packet := range responses { + n, _, _, src, err := conn.ReadMsgUDP(buffer, oobdata) + require.NoError(t, err) + _, err = dhcpv6.FromBytes(buffer[:n]) + require.NoError(t, err) + _, err = conn.WriteTo(packet.ToBytes(), src) + require.NoError(t, err) + } +} + +func TestNewClient(t *testing.T) { + c := NewClient() + require.NotNil(t, c) + require.Equal(t, c.ReadTimeout, dhcpv6.DefaultReadTimeout) + require.Equal(t, c.ReadTimeout, dhcpv6.DefaultWriteTimeout) +} + +func TestOpenInvalidAddrFailes(t *testing.T) { + c := NewClient() + err := c.Open(512) + require.Error(t, err) +} + +// This test uses port 15438 so please make sure its not used before running +func TestOpenClose(t *testing.T) { + c := NewClient() + addr, err := net.ResolveUDPAddr("udp6", ":15438") + require.NoError(t, err) + c.LocalAddr = addr + err = c.Open(512) + require.NoError(t, err) + defer c.Close() +} + +// This test uses ports 15438 and 15439 so please make sure they are not used +// before running +func TestSendTimeout(t *testing.T) { + c := NewClient() + addr, err := net.ResolveUDPAddr("udp6", ":15438") + require.NoError(t, err) + remote, err := net.ResolveUDPAddr("udp6", ":15439") + require.NoError(t, err) + c.ReadTimeout = 50 * time.Millisecond + c.WriteTimeout = 50 * time.Millisecond + c.LocalAddr = addr + c.RemoteAddr = remote + err = c.Open(512) + require.NoError(t, err) + defer c.Close() + m, err := dhcpv6.NewMessage() + require.NoError(t, err) + _, err, timeout := c.Send(m).GetOrTimeout(200) + require.NoError(t, err) + require.True(t, timeout) +} + +// This test uses ports 15438 and 15439 so please make sure they are not used +// before running +func TestSend(t *testing.T) { + s, err := solicit("c8:6c:2c:47:96:fd") + require.NoError(t, err) + require.NotNil(t, s) + + a, err := dhcpv6.NewAdvertiseFromSolicit(s) + require.NoError(t, err) + require.NotNil(t, a) + + c := NewClient() + addr, err := net.ResolveUDPAddr("udp6", ":15438") + require.NoError(t, err) + remote, err := net.ResolveUDPAddr("udp6", ":15439") + require.NoError(t, err) + c.LocalAddr = addr + c.RemoteAddr = remote + + go serve(t, remote, a) + + err = c.Open(16) + require.NoError(t, err) + defer c.Close() + + f := c.Send(s) + response, err, timeout := f.GetOrTimeout(1000) + require.False(t, timeout) + require.NoError(t, err) + require.Equal(t, a, response) +} diff --git a/dhcpv6/async_test.go b/dhcpv6/async_test.go deleted file mode 100644 index 4f5a750..0000000 --- a/dhcpv6/async_test.go +++ /dev/null @@ -1,54 +0,0 @@ -package dhcpv6 - -import ( - "net" - "testing" - "time" - - "github.com/stretchr/testify/require" -) - -func TestNewAsyncClient(t *testing.T) { - c := NewAsyncClient() - require.NotNil(t, c) - require.Equal(t, c.ReadTimeout, DefaultReadTimeout) - require.Equal(t, c.ReadTimeout, DefaultWriteTimeout) -} - -func TestOpenInvalidAddrFailes(t *testing.T) { - c := NewAsyncClient() - err := c.Open(512) - require.Error(t, err) -} - -// This test uses port 15438 so please make sure its not used before running -func TestOpenClose(t *testing.T) { - c := NewAsyncClient() - addr, err := net.ResolveUDPAddr("udp6", ":15438") - require.NoError(t, err) - c.LocalAddr = addr - err = c.Open(512) - require.NoError(t, err) - defer c.Close() -} - -// This test uses ports 15438 and 15439 so please make sure they are not used -// before running -func TestSendTimeout(t *testing.T) { - c := NewAsyncClient() - addr, err := net.ResolveUDPAddr("udp6", ":15438") - require.NoError(t, err) - remote, err := net.ResolveUDPAddr("udp6", ":15439") - require.NoError(t, err) - c.ReadTimeout = 50 * time.Millisecond - c.WriteTimeout = 50 * time.Millisecond - c.LocalAddr = addr - c.RemoteAddr = remote - err = c.Open(512) - require.NoError(t, err) - defer c.Close() - m, err := NewMessage() - require.NoError(t, err) - _, err = c.Send(m).WaitTimeout(200 * time.Millisecond) - require.Error(t, err) -} diff --git a/dhcpv6/client.go b/dhcpv6/client.go index 3492a47..3ed7861 100644 --- a/dhcpv6/client.go +++ b/dhcpv6/client.go @@ -11,7 +11,7 @@ const ( DefaultWriteTimeout = 3 * time.Second // time to wait for write calls DefaultReadTimeout = 3 * time.Second // time to wait for read calls DefaultInterfaceUpTimeout = 3 * time.Second // time to wait before a network interface goes up - maxUDPReceivedPacketSize = 8192 // arbitrary size. Theoretically could be up to 65kb + MaxUDPReceivedPacketSize = 8192 // arbitrary size. Theoretically could be up to 65kb ) // Broadcast destination IP addresses as defined by RFC 3315 @@ -148,7 +148,7 @@ func (c *Client) sendReceive(ifname string, packet DHCPv6, expectedType MessageT isMessage = true } for { - buf := make([]byte, maxUDPReceivedPacketSize) + buf := make([]byte, MaxUDPReceivedPacketSize) n, _, _, _, err := conn.ReadMsgUDP(buf, oobdata) if err != nil { return nil, err diff --git a/dhcpv6/future.go b/dhcpv6/future.go deleted file mode 100644 index d0ae6cd..0000000 --- a/dhcpv6/future.go +++ /dev/null @@ -1,111 +0,0 @@ -package dhcpv6 - -import ( - "errors" - "time" -) - -// Response represents a value which Future resolves to -type Response interface { - Value() DHCPv6 - Error() error -} - -// Future is a result of an asynchronous DHCPv6 call -type Future (<-chan Response) - -// SuccessFun can be used as a success callback -type SuccessFun func(val DHCPv6) Future - -// FailureFun can be used as a failure callback -type FailureFun func(err error) Future - -type response struct { - val DHCPv6 - err error -} - -func (r *response) Value() DHCPv6 { - return r.val -} - -func (r *response) Error() error { - return r.err -} - -// NewFuture creates a new future, which can be written to -func NewFuture() chan Response { - return make(chan Response, 1) -} - -// NewResponse creates a new future response -func NewResponse(val DHCPv6, err error) Response { - return &response{val: val, err: err} -} - -// NewSuccessFuture creates a future that resolves to a value -func NewSuccessFuture(val DHCPv6) Future { - f := NewFuture() - go func() { - f <- NewResponse(val, nil) - }() - return f -} - -// NewFailureFuture creates a future that resolves to an error -func NewFailureFuture(err error) Future { - f := NewFuture() - go func() { - f <- NewResponse(nil, err) - }() - return f -} - -// Then allows to chain the futures executing appropriate function depending -// on the previous future value -func (f Future) Then(success SuccessFun, failure FailureFun) Future { - g := NewFuture() - go func() { - r := <-f - if r.Error() != nil { - r = <-failure(r.Error()) - g <- r - } else { - r = <-success(r.Value()) - g <- r - } - }() - return g -} - -// OnSuccess allows to chain the futures executing next one only if the first -// one succeeds -func (f Future) OnSuccess(success SuccessFun) Future { - return f.Then(success, func(err error) Future { - return NewFailureFuture(err) - }) -} - -// OnFailure allows to chain the futures executing next one only if the first -// one fails -func (f Future) OnFailure(failure FailureFun) Future { - return f.Then(func(val DHCPv6) Future { - return NewSuccessFuture(val) - }, failure) -} - -// Wait blocks the execution until a future resolves -func (f Future) Wait() (DHCPv6, error) { - r := <-f - return r.Value(), r.Error() -} - -// WaitTimeout blocks the execution until a future resolves or times out -func (f Future) WaitTimeout(timeout time.Duration) (DHCPv6, error) { - select { - case r := <-f: - return r.Value(), r.Error() - case <-time.After(timeout): - return nil, errors.New("Timed out") - } -} diff --git a/dhcpv6/future_test.go b/dhcpv6/future_test.go deleted file mode 100644 index bee87e3..0000000 --- a/dhcpv6/future_test.go +++ /dev/null @@ -1,170 +0,0 @@ -package dhcpv6 - -import ( - "errors" - "testing" - "time" - - "github.com/stretchr/testify/require" -) - -func TestResponseValue(t *testing.T) { - m, err := NewMessage() - require.NoError(t, err) - r := NewResponse(m, nil) - require.Equal(t, r.Value(), m) - require.Equal(t, r.Error(), nil) -} - -func TestResponseError(t *testing.T) { - e := errors.New("Test error") - r := NewResponse(nil, e) - require.Equal(t, r.Value(), nil) - require.Equal(t, r.Error(), e) -} - -func TestSuccessFuture(t *testing.T) { - m, err := NewMessage() - require.NoError(t, err) - f := NewSuccessFuture(m) - - val, err := f.Wait() - require.NoError(t, err) - require.Equal(t, val, m) -} - -func TestFailureFuture(t *testing.T) { - e := errors.New("Test error") - f := NewFailureFuture(e) - - val, err := f.Wait() - require.Equal(t, err, e) - require.Equal(t, val, nil) -} - -func TestThenSuccess(t *testing.T) { - m, err := NewMessage() - require.NoError(t, err) - s, err := NewMessage() - require.NoError(t, err) - e := errors.New("Test error") - - f := NewSuccessFuture(m). - Then(func(_ DHCPv6) Future { - return NewSuccessFuture(s) - }, func(_ error) Future { - return NewFailureFuture(e) - }) - - val, err := f.Wait() - require.NoError(t, err) - require.NotEqual(t, val, m) - require.Equal(t, val, s) -} - -func TestThenFailure(t *testing.T) { - m, err := NewMessage() - require.NoError(t, err) - s, err := NewMessage() - require.NoError(t, err) - e := errors.New("Test error") - e2 := errors.New("Test error 2") - - f := NewFailureFuture(e). - Then(func(_ DHCPv6) Future { - return NewSuccessFuture(s) - }, func(_ error) Future { - return NewFailureFuture(e2) - }) - - val, err := f.Wait() - require.Error(t, err) - require.NotEqual(t, val, m) - require.NotEqual(t, val, s) - require.NotEqual(t, err, e) - require.Equal(t, err, e2) -} - -func TestOnSuccess(t *testing.T) { - m, err := NewMessage() - require.NoError(t, err) - s, err := NewMessage() - require.NoError(t, err) - - f := NewSuccessFuture(m). - OnSuccess(func(_ DHCPv6) Future { - return NewSuccessFuture(s) - }) - - val, err := f.Wait() - require.NoError(t, err) - require.NotEqual(t, val, m) - require.Equal(t, val, s) -} - -func TestOnSuccessForFailureFuture(t *testing.T) { - m, err := NewMessage() - require.NoError(t, err) - e := errors.New("Test error") - - f := NewFailureFuture(e). - OnSuccess(func(_ DHCPv6) Future { - return NewSuccessFuture(m) - }) - - val, err := f.Wait() - require.Error(t, err) - require.Equal(t, err, e) - require.NotEqual(t, val, m) -} - -func TestOnFailure(t *testing.T) { - m, err := NewMessage() - require.NoError(t, err) - s, err := NewMessage() - require.NoError(t, err) - e := errors.New("Test error") - - f := NewFailureFuture(e). - OnFailure(func(_ error) Future { - return NewSuccessFuture(s) - }) - - val, err := f.Wait() - require.NoError(t, err) - require.NotEqual(t, val, m) - require.Equal(t, val, s) -} - -func TestOnFailureForSuccessFuture(t *testing.T) { - m, err := NewMessage() - require.NoError(t, err) - s, err := NewMessage() - require.NoError(t, err) - - f := NewSuccessFuture(m). - OnFailure(func(_ error) Future { - return NewSuccessFuture(s) - }) - - val, err := f.Wait() - require.NoError(t, err) - require.NotEqual(t, val, s) - require.Equal(t, val, m) -} - -func TestWaitTimeout(t *testing.T) { - m, err := NewMessage() - require.NoError(t, err) - s, err := NewMessage() - require.NoError(t, err) - f := NewSuccessFuture(m).OnSuccess(func(_ DHCPv6) Future { - time.Sleep(1 * time.Second) - return NewSuccessFuture(s) - }) - val, err := f.WaitTimeout(50 * time.Millisecond) - require.Error(t, err) - require.Equal(t, err.Error(), "Timed out") - require.NotEqual(t, val, m) - require.NotEqual(t, val, s) -} -- cgit v1.2.3