summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--ryu/tests/packet_data/pcap/big_endian.pcapbin0 -> 78 bytes
-rw-r--r--ryu/tests/packet_data/pcap/little_endian.pcapbin0 -> 78 bytes
-rw-r--r--ryu/tests/unit/lib/test_pcaplib.py230
3 files changed, 230 insertions, 0 deletions
diff --git a/ryu/tests/packet_data/pcap/big_endian.pcap b/ryu/tests/packet_data/pcap/big_endian.pcap
new file mode 100644
index 00000000..1387f5dd
--- /dev/null
+++ b/ryu/tests/packet_data/pcap/big_endian.pcap
Binary files differ
diff --git a/ryu/tests/packet_data/pcap/little_endian.pcap b/ryu/tests/packet_data/pcap/little_endian.pcap
new file mode 100644
index 00000000..11072bdd
--- /dev/null
+++ b/ryu/tests/packet_data/pcap/little_endian.pcap
Binary files differ
diff --git a/ryu/tests/unit/lib/test_pcaplib.py b/ryu/tests/unit/lib/test_pcaplib.py
new file mode 100644
index 00000000..ac86904e
--- /dev/null
+++ b/ryu/tests/unit/lib/test_pcaplib.py
@@ -0,0 +1,230 @@
+# Copyright (C) 2016 Nippon Telegraph and Telephone Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import print_function
+
+import logging
+import os
+import struct
+import sys
+import unittest
+
+try:
+ import mock # Python 2
+except ImportError:
+ from unittest import mock # Python 3
+
+from nose.tools import eq_
+from nose.tools import raises
+
+from ryu.utils import binary_str
+from ryu.lib import pcaplib
+
+LOG = logging.getLogger(__name__)
+
+PCAP_PACKET_DATA_DIR = os.path.join(
+ os.path.dirname(sys.modules[__name__].__file__),
+ '../../packet_data/pcap/')
+
+
+class Test_PcapFileHdr(unittest.TestCase):
+ """
+ Test case for pcaplib.PcapFileHdr class
+ """
+ hdr = pcaplib.PcapFileHdr(
+ magic=None, # temporary default
+ version_major=2,
+ version_minor=4,
+ thiszone=0x11223344,
+ sigfigs=0x22334455,
+ snaplen=0x33445566,
+ network=0x44556677,
+ )
+
+ buf_big = (
+ b'\xa1\xb2\xc3\xd4' # magic (Big Endian)
+ b'\x00\x02\x00\x04' # version_major, version_minor
+ b'\x11\x22\x33\x44' # thiszone
+ b'\x22\x33\x44\x55' # sigfigs
+ b'\x33\x44\x55\x66' # snaplen
+ b'\x44\x55\x66\x77' # network
+ )
+
+ buf_little = (
+ b'\xd4\xc3\xb2\xa1' # magic (Little Endian)
+ b'\x02\x00\x04\x00' # version_major, version_minor
+ b'\x44\x33\x22\x11' # thiszone
+ b'\x55\x44\x33\x22' # sigfigs
+ b'\x66\x55\x44\x33' # snaplen
+ b'\x77\x66\x55\x44' # network
+ )
+
+ buf_invalid = (
+ b'\xff\xff\xff\xff' # magic (Invalid)
+ b'\x02\x00\x04\x00' # version_major, version_minor
+ b'\x44\x33\x22\x11' # thiszone
+ b'\x55\x44\x33\x22' # sigfigs
+ b'\x66\x55\x44\x33' # snaplen
+ b'\x77\x66\x55\x44' # network
+ )
+
+ def _assert(self, magic, ret):
+ self.hdr.magic = magic
+ eq_(self.hdr.__dict__, ret.__dict__)
+
+ def test_parser_with_big_endian(self):
+ ret, byteorder = pcaplib.PcapFileHdr.parser(self.buf_big)
+ self._assert(pcaplib.PcapFileHdr.MAGIC_NUMBER_IDENTICAL, ret)
+ eq_('big', byteorder)
+
+ def test_parser_with_little_endian(self):
+ ret, byteorder = pcaplib.PcapFileHdr.parser(self.buf_little)
+ self._assert(pcaplib.PcapFileHdr.MAGIC_NUMBER_SWAPPED, ret)
+ eq_('little', byteorder)
+
+ @mock.patch('sys.byteorder', 'big')
+ def test_serialize_with_big_endian(self):
+ buf = self.hdr.serialize()
+ eq_(binary_str(self.buf_big), binary_str(buf))
+
+ @mock.patch('sys.byteorder', 'little')
+ def test_serialize_with_little_endian(self):
+ buf = self.hdr.serialize()
+ eq_(binary_str(self.buf_little), binary_str(buf))
+
+ @raises(struct.error)
+ def test_parser_with_invalid_magic_number(self):
+ pcaplib.PcapFileHdr.parser(self.buf_invalid)
+
+
+class Test_PcapPktHdr(unittest.TestCase):
+ """
+ Test case for pcaplib.PcapPktHdr class
+ """
+ expected_buf = b'test_data'
+
+ hdr = pcaplib.PcapPktHdr(
+ ts_sec=0x11223344,
+ ts_usec=0x22334455,
+ incl_len=len(expected_buf),
+ orig_len=0x44556677,
+ )
+
+ buf_big = (
+ b'\x11\x22\x33\x44' # ts_sec
+ b'\x22\x33\x44\x55' # ts_usec
+ b'\x00\x00\x00\x09' # incl_len = len(expected_buf)
+ b'\x44\x55\x66\x77' # orig_len
+ )
+
+ buf_little = (
+ b'\x44\x33\x22\x11' # ts_sec
+ b'\x55\x44\x33\x22' # ts_usec
+ b'\x09\x00\x00\x00' # incl_len = len(expected_buf)
+ b'\x77\x66\x55\x44' # orig_len
+ )
+
+ def test_parser_with_big_endian(self):
+ ret, buf = pcaplib.PcapPktHdr.parser(
+ self.buf_big + self.expected_buf, 'big')
+ eq_(self.hdr.__dict__, ret.__dict__)
+ eq_(self.expected_buf, buf)
+
+ def test_parser_with_little_endian(self):
+ ret, buf = pcaplib.PcapPktHdr.parser(
+ self.buf_little + self.expected_buf, 'little')
+ eq_(self.hdr.__dict__, ret.__dict__)
+ eq_(self.expected_buf, buf)
+
+ @mock.patch('sys.byteorder', 'big')
+ def test_serialize_with_big_endian(self):
+ buf = self.hdr.serialize()
+ eq_(binary_str(self.buf_big), binary_str(buf))
+
+ @mock.patch('sys.byteorder', 'little')
+ def test_serialize_with_little_endian(self):
+ buf = self.hdr.serialize()
+ eq_(binary_str(self.buf_little), binary_str(buf))
+
+
+class Test_pcaplib_Reader(unittest.TestCase):
+ """
+ Test case for pcaplib.Reader class
+ """
+
+ expected_outputs = [
+ (0x1234 + (0x5678 / 1e6), b'test_data_1'), # sec=0x1234, usec=0x5678
+ (0x2345 + (0x6789 / 1e6), b'test_data_2'), # sec=0x2345, usec=0x6789
+ ]
+
+ def _test(self, file_name):
+ outputs = []
+ for ts, buf in pcaplib.Reader(open(file_name, 'rb')):
+ outputs.append((ts, buf))
+
+ eq_(self.expected_outputs, outputs)
+
+ def test_with_big_endian(self):
+ self._test(os.path.join(PCAP_PACKET_DATA_DIR, 'big_endian.pcap'))
+
+ def test_with_little_endian(self):
+ self._test(os.path.join(PCAP_PACKET_DATA_DIR, 'little_endian.pcap'))
+
+
+class DummyFile(object):
+
+ def __init__(self):
+ self.buf = b''
+
+ def write(self, buf):
+ self.buf += buf
+
+ def close(self):
+ pass
+
+
+class Test_pcaplib_Writer(unittest.TestCase):
+ """
+ Test case for pcaplib.Writer class
+ """
+
+ @staticmethod
+ def _test(file_name):
+ expected_buf = open(file_name, 'rb').read()
+ f = DummyFile()
+ w = pcaplib.Writer(f)
+ w.write_pkt(b'test_data_1', ts=(0x1234 + (0x5678 / 1e6)))
+ w.write_pkt(b'test_data_2', ts=(0x2345 + (0x6789 / 1e6)))
+ eq_(expected_buf, f.buf)
+
+ @mock.patch('sys.byteorder', 'big')
+ def test_with_big_endian(self):
+ self._test(os.path.join(PCAP_PACKET_DATA_DIR, 'big_endian.pcap'))
+
+ @mock.patch('sys.byteorder', 'little')
+ def test_with_little_endian(self):
+ self._test(os.path.join(PCAP_PACKET_DATA_DIR, 'little_endian.pcap'))
+
+ @staticmethod
+ @mock.patch.object(pcaplib.Writer, '_write_pcap_file_hdr', mock.MagicMock)
+ @mock.patch.object(pcaplib.Writer, '_write_pkt_hdr', mock.MagicMock)
+ def test_with_longer_buf():
+ f = DummyFile()
+ snaplen = 4
+ w = pcaplib.Writer(f, snaplen=snaplen)
+ w.write_pkt(b'hogehoge', ts=0)
+ expected_buf = b'hoge' # b'hogehoge'[:snaplen]
+ eq_(expected_buf, f.buf)
+ eq_(snaplen, len(f.buf))