summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authortakahashi.minoru <takahashi.minoru7@gmail.com>2014-06-06 14:32:17 +0900
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2014-06-10 21:11:52 +0900
commitcfdce1f34043a0e245999e0515be3bbc3f5f0316 (patch)
tree5dc551a5ecd2817be6b9252af86c8a57d6ce8e5c
parented25eefa846e2f8b7a6683908f224efc2bc0acc9 (diff)
packet lib: add Connectivity Fault Management Protocol(CFM, IEEE 802.1ag)
Signed-off-by: TAKAHASHI Minoru <takahashi.minoru7@gmail.com> Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r--doc/source/library_packet_ref.rst3
-rw-r--r--ryu/lib/packet/cfm.py1371
-rw-r--r--ryu/lib/packet/vlan.py2
-rw-r--r--ryu/ofproto/ether.py1
-rw-r--r--ryu/tests/unit/packet/test_cfm.py1760
5 files changed, 3137 insertions, 0 deletions
diff --git a/doc/source/library_packet_ref.rst b/doc/source/library_packet_ref.rst
index a39aa563..7048e72c 100644
--- a/doc/source/library_packet_ref.rst
+++ b/doc/source/library_packet_ref.rst
@@ -50,6 +50,9 @@ Protocol Header classes
.. automodule:: ryu.lib.packet.icmpv6
:members:
+.. automodule:: ryu.lib.packet.cfm
+ :members:
+
.. automodule:: ryu.lib.packet.tcp
:members:
diff --git a/ryu/lib/packet/cfm.py b/ryu/lib/packet/cfm.py
new file mode 100644
index 00000000..8e5717e5
--- /dev/null
+++ b/ryu/lib/packet/cfm.py
@@ -0,0 +1,1371 @@
+# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import abc
+import six
+import struct
+from ryu.lib import addrconv
+from ryu.lib import stringify
+from ryu.lib.packet import packet_base
+
+# IEEE 802.1ag OpCode
+CFM_CC_MESSAGE = 0x01
+CFM_LOOPBACK_REPLY = 0x02
+CFM_LOOPBACK_MESSAGE = 0x03
+CFM_LINK_TRACE_REPLY = 0x04
+CFM_LINK_TRACE_MESSAGE = 0x05
+
+# IEEE 802.1ag TLV type
+CFM_END_TLV = 0x00
+CFM_SENDER_ID_TLV = 0x01
+CFM_PORT_STATUS_TLV = 0x02
+CFM_DATA_TLV = 0x03
+CFM_INTERFACE_STATUS_TLV = 0x04
+CFM_REPLY_INGRESS_TLV = 0x05
+CFM_REPLY_EGRESS_TLV = 0x06
+CFM_LTM_EGRESS_IDENTIFIER_TLV = 0x07
+CFM_LTR_EGRESS_IDENTIFIER_TLV = 0x08
+CFM_ORGANIZATION_SPECIFIC_TLV = 0x1f
+
+# IEEE 802.1ag CFM version
+CFM_VERSION = 0
+
+
+class cfm(packet_base.PacketBase):
+ """CFM (Connectivity Fault Management) Protocol header class.
+
+ http://standards.ieee.org/getieee802/download/802.1ag-2007.pdf
+
+ OpCode Field range assignments
+
+ +---------------+--------------------------------------------------+
+ | OpCode range | CFM PDU or organization |
+ +===============+==================================================+
+ | 0 | Reserved for IEEE 802.1 |
+ +---------------+--------------------------------------------------+
+ | 1 | Continuity Check Message (CCM) |
+ +---------------+--------------------------------------------------+
+ | 2 | Loopback Reply (LBR) |
+ +---------------+--------------------------------------------------+
+ | 3 | Loopback Message (LBM) |
+ +---------------+--------------------------------------------------+
+ | 4 | Linktrace Reply (LTR) |
+ +---------------+--------------------------------------------------+
+ | 5 | Linktrace Message (LTM) |
+ +---------------+--------------------------------------------------+
+ | 06 - 31 | Reserved for IEEE 802.1 |
+ +---------------+--------------------------------------------------+
+ | 32 - 63 | Defined by ITU-T Y.1731 |
+ +---------------+--------------------------------------------------+
+ | 64 - 255 | Reserved for IEEE 802.1. |
+ +---------------+--------------------------------------------------+
+
+ An instance has the following attributes at least.
+ Most of them are same to the on-wire counterparts but in host byte order.
+ __init__ takes the corresponding args in this order.
+
+ .. tabularcolumns:: |l|L|
+
+ ============== ========================================
+ Attribute Description
+ ============== ========================================
+ op CFM PDU
+ ============== ========================================
+
+ """
+ _PACK_STR = '!B'
+ _MIN_LEN = struct.calcsize(_PACK_STR)
+ _CFM_OPCODE = {}
+ _TYPE = {
+ 'ascii': [
+ 'ltm_orig_addr', 'ltm_targ_addr'
+ ]
+ }
+
+ @staticmethod
+ def register_cfm_opcode(type_):
+ def _register_cfm_opcode(cls):
+ cfm._CFM_OPCODE[type_] = cls
+ return cls
+ return _register_cfm_opcode
+
+ def __init__(self, op=None):
+ super(cfm, self).__init__()
+ assert isinstance(op, operation)
+ self.op = op
+
+ @classmethod
+ def parser(cls, buf):
+ (opcode, ) = struct.unpack_from(cls._PACK_STR, buf, cls._MIN_LEN)
+ cls_ = cls._CFM_OPCODE.get(opcode)
+ op = cls_.parser(buf)
+ instance = cls(op)
+ rest = buf[len(instance):]
+ return instance, None, rest
+
+ def serialize(self, payload, prev):
+ buf = self.op.serialize()
+ return buf
+
+ def __len__(self):
+ return len(self.op)
+
+
+@six.add_metaclass(abc.ABCMeta)
+class operation(stringify.StringifyMixin):
+
+ _TLV_TYPES = {}
+ _END_TLV_LEN = 1
+
+ @staticmethod
+ def register_tlv_types(type_):
+ def _register_tlv_types(cls):
+ operation._TLV_TYPES[type_] = cls
+ return cls
+ return _register_tlv_types
+
+ def __init__(self, md_lv, version, tlvs):
+ self.md_lv = md_lv
+ self.version = version
+ tlvs = tlvs or []
+ assert isinstance(tlvs, list)
+ for tlv_ in tlvs:
+ assert isinstance(tlv_, tlv)
+ self.tlvs = tlvs
+
+ @classmethod
+ @abc.abstractmethod
+ def parser(cls, buf):
+ pass
+
+ @abc.abstractmethod
+ def serialize(self):
+ pass
+
+ @abc.abstractmethod
+ def __len__(self):
+ pass
+
+ @classmethod
+ def _parser_tlvs(cls, buf):
+ offset = 0
+ tlvs = []
+ while True:
+ (type_, ) = struct.unpack_from('!B', buf, offset)
+ cls_ = cls._TLV_TYPES.get(type_)
+ if not cls_:
+ assert type_ is CFM_END_TLV
+ break
+ tlv_ = cls_.parser(buf[offset:])
+ tlvs.append(tlv_)
+ offset += len(tlv_)
+ return tlvs
+
+ @staticmethod
+ def _serialize_tlvs(tlvs):
+ buf = bytearray()
+ for tlv_ in tlvs:
+ buf.extend(tlv_.serialize())
+ return buf
+
+ def _calc_len(self, len_):
+ for tlv_ in self.tlvs:
+ len_ += len(tlv_)
+ len_ += self._END_TLV_LEN
+ return len_
+
+
+@cfm.register_cfm_opcode(CFM_CC_MESSAGE)
+class cc_message(operation):
+
+ """CFM (IEEE Std 802.1ag-2007) Continuity Check Message (CCM)
+ encoder/decoder class.
+
+ This is used with ryu.lib.packet.cfm.cfm.
+
+ An instance has the following attributes at least.
+ Most of them are same to the on-wire counterparts but in host byte order.
+ __init__ takes the corresponding args in this order.
+
+ .. tabularcolumns:: |l|L|
+
+ ==================== =======================================
+ Attribute Description
+ ==================== =======================================
+ md_lv Maintenance Domain Level.
+ version The protocol version number.
+ rdi RDI bit.
+ interval CCM Interval.The default is 4 (1 frame/s)
+ seq_num Sequence Number.
+ mep_id Maintenance association End Point Identifier.
+ md_name_format Maintenance Domain Name Format.
+ The default is 4 (Character string)
+ md_name_length Maintenance Domain Name Length.
+ (0 means automatically-calculate
+ when encoding.)
+ md_name Maintenance Domain Name.
+ short_ma_name_format Short MA Name Format.
+ The default is 2 (Character string)
+ short_ma_name_length Short MA Name Format Length.
+ (0 means automatically-calculate
+ when encoding.)
+ short_ma_name Short MA Name.
+ tlvs TLVs.
+ ==================== =======================================
+ """
+
+ _PACK_STR = '!4BIHB'
+
+ _MIN_LEN = struct.calcsize(_PACK_STR)
+ _TLV_OFFSET = 70
+ _MD_NAME_FORMAT_LEN = 1
+ _MD_NAME_LENGTH_LEN = 1
+ _SHORT_MA_NAME_FORMAT_LEN = 1
+ _SHORT_MA_NAME_LENGTH_LEN = 1
+ _MA_ID_LEN = 64
+
+ # Maintenance Domain Name Format
+ _MD_FMT_NO_MD_NAME_PRESENT = 1
+ _MD_FMT_DOMAIN_NAME_BASED_STRING = 2
+ _MD_FMT_MAC_ADDRESS_TWO_OCTET_INTEGER = 3
+ _MD_FMT_CHARACTER_STRING = 4
+
+ # Short MA Name Format
+ _SHORT_MA_FMT_PRIMARY_VID = 1
+ _SHORT_MA_FMT_CHARACTER_STRING = 2
+ _SHORT_MA_FMT_TWO_OCTET_INTEGER = 3
+ _SHORT_MA_FMT_RFC_2685_VPN_ID = 4
+
+ # CCM Transmission Interval
+ _INTERVAL_300_HZ = 1
+ _INTERVAL_10_MSEC = 2
+ _INTERVAL_100_MSEC = 3
+ _INTERVAL_1_SEC = 4
+ _INTERVAL_10_SEC = 5
+ _INTERVAL_1_MIN = 6
+ _INTERVAL_10_MIN = 6
+
+ def __init__(self, md_lv=0, version=CFM_VERSION,
+ rdi=0, interval=_INTERVAL_1_SEC, seq_num=0, mep_id=1,
+ md_name_format=_MD_FMT_CHARACTER_STRING,
+ md_name_length=0, md_name="0",
+ short_ma_name_format=_SHORT_MA_FMT_CHARACTER_STRING,
+ short_ma_name_length=0, short_ma_name="1",
+ tlvs=None):
+ super(cc_message, self).__init__(md_lv, version, tlvs)
+ self._opcode = CFM_CC_MESSAGE
+ assert rdi in [0, 1]
+ self.rdi = rdi
+ assert interval is not 0
+ self.interval = interval
+ self.seq_num = seq_num
+ assert 1 <= mep_id <= 8191
+ self.mep_id = mep_id
+ self.md_name_format = md_name_format
+ self.md_name_length = md_name_length
+ self.md_name = md_name
+ self.short_ma_name_format = short_ma_name_format
+ self.short_ma_name_length = short_ma_name_length
+ self.short_ma_name = short_ma_name
+
+ @classmethod
+ def parser(cls, buf):
+ (md_lv_version, opcode, flags, tlv_offset, seq_num, mep_id,
+ md_name_format) = struct.unpack_from(cls._PACK_STR, buf)
+ md_name_length = 0
+ md_name = ""
+ md_lv = int(md_lv_version >> 5)
+ version = int(md_lv_version & 0x1f)
+ rdi = int(flags >> 7)
+ interval = int(flags & 0x07)
+ offset = cls._MIN_LEN
+ # parse md_name
+ if md_name_format != cls._MD_FMT_NO_MD_NAME_PRESENT:
+ (md_name_length, ) = struct.unpack_from("!B", buf, offset)
+ offset += cls._MD_NAME_LENGTH_LEN
+ form = "%dB" % md_name_length
+ md_name = struct.unpack_from(form, buf, offset)
+ offset += md_name_length
+ # parse short_ma_name
+ (short_ma_name_format,
+ short_ma_name_length) = struct.unpack_from("!2B", buf, offset)
+ offset += (cls._SHORT_MA_NAME_FORMAT_LEN +
+ cls._SHORT_MA_NAME_LENGTH_LEN)
+ form = "%dB" % short_ma_name_length
+ short_ma_name = struct.unpack_from(form, buf, offset)
+ offset = cls._MIN_LEN + (cls._MA_ID_LEN - cls._MD_NAME_FORMAT_LEN)
+ tlvs = cls._parser_tlvs(buf[offset:])
+ # ascii to text
+ if md_name_format == cls._MD_FMT_DOMAIN_NAME_BASED_STRING or \
+ md_name_format == cls._MD_FMT_CHARACTER_STRING:
+ md_name = "".join(map(chr, md_name))
+ if short_ma_name_format == cls._SHORT_MA_FMT_CHARACTER_STRING:
+ short_ma_name = "".join(map(chr, short_ma_name))
+ return cls(md_lv, version, rdi, interval, seq_num, mep_id,
+ md_name_format, md_name_length,
+ md_name,
+ short_ma_name_format, short_ma_name_length,
+ short_ma_name,
+ tlvs)
+
+ def serialize(self):
+ buf = struct.pack(self._PACK_STR,
+ (self.md_lv << 5) | self.version,
+ self._opcode,
+ (self.rdi << 7) | self.interval,
+ self._TLV_OFFSET,
+ self.seq_num, self.mep_id, self.md_name_format)
+ buf = bytearray(buf)
+ # Maintenance Domain Name
+ if self.md_name_format != self._MD_FMT_NO_MD_NAME_PRESENT:
+ if self.md_name_length == 0:
+ self.md_name_length = len(self.md_name)
+ buf.extend(struct.pack('!B%ds' % self.md_name_length,
+ self.md_name_length, self.md_name))
+ # Short MA Name
+ if self.short_ma_name_length == 0:
+ self.short_ma_name_length = len(self.short_ma_name)
+ buf.extend(struct.pack('!2B%ds' % self.short_ma_name_length,
+ self.short_ma_name_format,
+ self.short_ma_name_length,
+ self.short_ma_name
+ ))
+ # 0 pad
+ maid_length = (self._MD_NAME_FORMAT_LEN +
+ self._SHORT_MA_NAME_FORMAT_LEN +
+ self._SHORT_MA_NAME_LENGTH_LEN +
+ self.short_ma_name_length)
+ if self.md_name_format != self._MD_FMT_NO_MD_NAME_PRESENT:
+ maid_length += self._MD_NAME_LENGTH_LEN + self.md_name_length
+ buf.extend(bytearray(self._MA_ID_LEN - maid_length))
+ # tlvs
+ if self.tlvs:
+ buf.extend(self._serialize_tlvs(self.tlvs))
+ buf.extend(struct.pack("!B", CFM_END_TLV))
+ return buf
+
+ def __len__(self):
+ return self._calc_len(
+ (self._MIN_LEN - self._MD_NAME_FORMAT_LEN) + self._MA_ID_LEN)
+
+
+class loopback(operation):
+
+ _PACK_STR = '!4BI'
+ _MIN_LEN = struct.calcsize(_PACK_STR)
+ _TLV_OFFSET = 4
+
+ @abc.abstractmethod
+ def __init__(self, md_lv, version, transaction_id, tlvs):
+ super(loopback, self).__init__(md_lv, version, tlvs)
+ self._flags = 0
+ self.transaction_id = transaction_id
+
+ @classmethod
+ def parser(cls, buf):
+ (md_lv_version, opcode, flags, tlv_offset,
+ transaction_id) = struct.unpack_from(cls._PACK_STR, buf)
+ md_lv = int(md_lv_version >> 5)
+ version = int(md_lv_version & 0x1f)
+ tlvs = cls._parser_tlvs(buf[cls._MIN_LEN:])
+ return cls(md_lv, version, transaction_id, tlvs)
+
+ def serialize(self):
+ buf = struct.pack(self._PACK_STR,
+ (self.md_lv << 5) | self.version,
+ self._opcode,
+ self._flags,
+ self._TLV_OFFSET,
+ self.transaction_id,
+ )
+ buf = bytearray(buf)
+ # tlvs
+ if self.tlvs:
+ buf.extend(self._serialize_tlvs(self.tlvs))
+ buf.extend(struct.pack("!B", CFM_END_TLV))
+
+ return buf
+
+ def __len__(self):
+ return self._calc_len(self._MIN_LEN)
+
+
+@cfm.register_cfm_opcode(CFM_LOOPBACK_MESSAGE)
+class loopback_message(loopback):
+
+ """CFM (IEEE Std 802.1ag-2007) Loopback Message (LBM) encoder/decoder class.
+
+ This is used with ryu.lib.packet.cfm.cfm.
+
+ An instance has the following attributes at least.
+ Most of them are same to the on-wire counterparts but in host byte order.
+ __init__ takes the corresponding args in this order.
+
+ .. tabularcolumns:: |l|L|
+
+ ================= =======================================
+ Attribute Description
+ ================= =======================================
+ md_lv Maintenance Domain Level.
+ version The protocol version number.
+ transaction_id Loopback Transaction Identifier.
+ tlvs TLVs.
+ ================= =======================================
+ """
+
+ def __init__(self, md_lv=0, version=CFM_VERSION,
+ transaction_id=0,
+ tlvs=None,
+ ):
+ super(loopback_message, self).__init__(md_lv, version,
+ transaction_id,
+ tlvs)
+ self._opcode = CFM_LOOPBACK_MESSAGE
+
+
+@cfm.register_cfm_opcode(CFM_LOOPBACK_REPLY)
+class loopback_reply(loopback):
+
+ """CFM (IEEE Std 802.1ag-2007) Loopback Reply (LBR) encoder/decoder class.
+
+ This is used with ryu.lib.packet.cfm.cfm.
+
+ An instance has the following attributes at least.
+ Most of them are same to the on-wire counterparts but in host byte order.
+ __init__ takes the corresponding args in this order.
+
+ .. tabularcolumns:: |l|L|
+
+ ==================== =======================================
+ Attribute Description
+ ==================== =======================================
+ md_lv Maintenance Domain Level.
+ version The protocol version number.
+ transaction_id Loopback Transaction Identifier.
+ tlvs TLVs.
+ ==================== =======================================
+ """
+
+ def __init__(self, md_lv=0, version=CFM_VERSION,
+ transaction_id=0,
+ tlvs=None,
+ ):
+ super(loopback_reply, self).__init__(md_lv, version,
+ transaction_id,
+ tlvs)
+ self._opcode = CFM_LOOPBACK_REPLY
+
+
+class link_trace(operation):
+
+ @abc.abstractmethod
+ def __init__(self, md_lv, version, use_fdb_only,
+ transaction_id, ttl, tlvs):
+ super(link_trace, self).__init__(md_lv, version, tlvs)
+ assert use_fdb_only in [0, 1]
+ self.use_fdb_only = use_fdb_only
+ self.transaction_id = transaction_id
+ self.ttl = ttl
+
+ @classmethod
+ @abc.abstractmethod
+ def parser(cls, buf):
+ pass
+
+ @abc.abstractmethod
+ def serialize(self):
+ pass
+
+ def __len__(self):
+ return self._calc_len(self._MIN_LEN)
+
+
+@cfm.register_cfm_opcode(CFM_LINK_TRACE_MESSAGE)
+class link_trace_message(link_trace):
+
+ """CFM (IEEE Std 802.1ag-2007) Linktrace Message (LTM)
+ encoder/decoder class.
+
+ This is used with ryu.lib.packet.cfm.cfm.
+
+ An instance has the following attributes at least.
+ Most of them are same to the on-wire counterparts but in host byte order.
+ __init__ takes the corresponding args in this order.
+
+ .. tabularcolumns:: |l|L|
+
+ ==================== =======================================
+ Attribute Description
+ ==================== =======================================
+ md_lv Maintenance Domain Level.
+ version The protocol version number.
+ use_fdb_only UseFDBonly bit.
+ transaction_id LTM Transaction Identifier.
+ ttl LTM TTL.
+ ltm_orig_addr Original MAC Address.
+ ltm_targ_addr Target MAC Address.
+ tlvs TLVs.
+ ==================== =======================================
+ """
+
+ _PACK_STR = '!4BIB6s6s'
+ _ALL_PACK_LEN = struct.calcsize(_PACK_STR)
+ _MIN_LEN = _ALL_PACK_LEN
+ _TLV_OFFSET = 17
+
+ def __init__(self, md_lv=0, version=CFM_VERSION,
+ use_fdb_only=1,
+ transaction_id=0,
+ ttl=64,
+ ltm_orig_addr='00:00:00:00:00:00',
+ ltm_targ_addr='00:00:00:00:00:00',
+ tlvs=None
+ ):
+ super(link_trace_message, self).__init__(md_lv, version,
+ use_fdb_only,
+ transaction_id,
+ ttl,
+ tlvs)
+ self._opcode = CFM_LINK_TRACE_MESSAGE
+ self.ltm_orig_addr = ltm_orig_addr
+ self.ltm_targ_addr = ltm_targ_addr
+
+ @classmethod
+ def parser(cls, buf):
+ (md_lv_version, opcode, flags, tlv_offset, transaction_id, ttl,
+ ltm_orig_addr, ltm_targ_addr) = struct.unpack_from(cls._PACK_STR, buf)
+ md_lv = int(md_lv_version >> 5)
+ version = int(md_lv_version & 0x1f)
+ use_fdb_only = int(flags >> 7)
+ tlvs = cls._parser_tlvs(buf[cls._MIN_LEN:])
+ return cls(md_lv, version, use_fdb_only,
+ transaction_id, ttl,
+ addrconv.mac.bin_to_text(ltm_orig_addr),
+ addrconv.mac.bin_to_text(ltm_targ_addr),
+ tlvs)
+
+ def serialize(self):
+ buf = struct.pack(self._PACK_STR,
+ (self.md_lv << 5) | self.version,
+ self._opcode,
+ self.use_fdb_only << 7,
+ self._TLV_OFFSET,
+ self.transaction_id,
+ self.ttl,
+ addrconv.mac.text_to_bin(self.ltm_orig_addr),
+ addrconv.mac.text_to_bin(self.ltm_targ_addr),
+ )
+ buf = bytearray(buf)
+ if self.tlvs:
+ buf.extend(self._serialize_tlvs(self.tlvs))
+ buf.extend(struct.pack("!B", CFM_END_TLV))
+ return buf
+
+
+@cfm.register_cfm_opcode(CFM_LINK_TRACE_REPLY)
+class link_trace_reply(link_trace):
+
+ """CFM (IEEE Std 802.1ag-2007) Linktrace Reply (LTR) encoder/decoder class.
+
+ This is used with ryu.lib.packet.cfm.cfm.
+
+ An instance has the following attributes at least.
+ Most of them are same to the on-wire counterparts but in host byte order.
+ __init__ takes the corresponding args in this order.
+
+ .. tabularcolumns:: |l|L|
+
+ ==================== =======================================
+ Attribute Description
+ ==================== =======================================
+ version The protocol version number.
+ use_fdb_only UseFDBonly bit.
+ fwd_yes FwdYes bit.
+ terminal_mep TerminalMep bit.
+ transaction_id LTR Transaction Identifier.
+ ttl Reply TTL.
+ relay_action Relay Action.The default is 1 (RlyHit)
+ tlvs TLVs.
+ ==================== =======================================
+ """
+ _PACK_STR = '!4BIBB'
+ _ALL_PACK_LEN = struct.calcsize(_PACK_STR)
+ _MIN_LEN = _ALL_PACK_LEN
+ _TLV_OFFSET = 6
+
+ # Relay Action field values
+ _RLY_HIT = 1
+ _RLY_FDB = 2
+ _RLY_MPDB = 3
+
+ def __init__(self, md_lv=0, version=CFM_VERSION, use_fdb_only=1,
+ fwd_yes=0, terminal_mep=1, transaction_id=0,
+ ttl=64, relay_action=_RLY_HIT, tlvs=None
+ ):
+ super(link_trace_reply, self).__init__(md_lv, version,
+ use_fdb_only,
+ transaction_id,
+ ttl,
+ tlvs)
+ self._opcode = CFM_LINK_TRACE_REPLY
+ assert fwd_yes in [0, 1]
+ self.fwd_yes = fwd_yes
+ assert terminal_mep in [0, 1]
+ self.terminal_mep = terminal_mep
+ assert relay_action in [self._RLY_HIT, self._RLY_FDB, self._RLY_MPDB]
+ self.relay_action = relay_action
+
+ @classmethod
+ def parser(cls, buf):
+ (md_lv_version, opcode, flags, tlv_offset, transaction_id, ttl,
+ relay_action) = struct.unpack_from(cls._PACK_STR, buf)
+ md_lv = int(md_lv_version >> 5)
+ version = int(md_lv_version & 0x1f)
+ use_fdb_only = int(flags >> 7)
+ fwd_yes = int(flags >> 6 & 0x01)
+ terminal_mep = int(flags >> 5 & 0x01)
+ tlvs = cls._parser_tlvs(buf[cls._MIN_LEN:])
+ return cls(md_lv, version, use_fdb_only, fwd_yes, terminal_mep,
+ transaction_id, ttl, relay_action, tlvs)
+
+ def serialize(self):
+ buf = struct.pack(self._PACK_STR,
+ (self.md_lv << 5) | self.version,
+ self._opcode,
+ (self.use_fdb_only << 7) |
+ (self.fwd_yes << 6) |
+ (self.terminal_mep << 5),
+ self._TLV_OFFSET,
+ self.transaction_id,
+ self.ttl,
+ self.relay_action,
+ )
+ buf = bytearray(buf)
+ if self.tlvs:
+ buf.extend(self._serialize_tlvs(self.tlvs))
+ buf.extend(struct.pack("!B", CFM_END_TLV))
+ return buf
+
+
+cfm.set_classes(cfm._CFM_OPCODE)
+
+
+@six.add_metaclass(abc.ABCMeta)
+class tlv(stringify.StringifyMixin):
+
+ _TYPE_LEN = 1
+ _LENGTH_LEN = 2
+ _TYPE = {
+ 'ascii': [
+ 'egress_id_mac', 'last_egress_id_mac',
+ 'next_egress_id_mac', 'mac_address'
+ ]
+ }
+
+ def __init__(self, length):
+ self.length = length
+
+ @classmethod
+ @abc.abstractmethod
+ def parser(cls, buf):
+ pass
+
+ @abc.abstractmethod
+ def serialize(self):
+ pass
+
+ def __len__(self):
+ return self.length + self._TYPE_LEN + self._LENGTH_LEN
+
+
+@operation.register_tlv_types(CFM_SENDER_ID_TLV)
+class sender_id_tlv(tlv):
+
+ """CFM (IEEE Std 802.1ag-2007) Sender ID TLV encoder/decoder class.
+
+ This is used with ryu.lib.packet.cfm.cfm.
+
+ An instance has the following attributes at least.
+ Most of them are same to the on-wire counterparts but in host byte order.
+ __init__ takes the corresponding args in this order.
+
+ .. tabularcolumns:: |l|L|
+
+ ==================== =======================================
+ Attribute Description
+ ==================== =======================================
+ length Length of Value field.
+ (0 means automatically-calculate when encoding.)
+ chassis_id_length Chassis ID Length.
+ (0 means automatically-calculate when encoding.)
+ chassis_id_subtype Chassis ID Subtype.
+ The default is 4 (Mac Address)
+ chassis_id Chassis ID.
+ ma_domain_length Management Address Domain Length.
+ (0 means automatically-calculate when encoding.)
+ ma_domain Management Address Domain.
+ ma_length Management Address Length.
+ (0 means automatically-calculate when encoding.)
+ ma Management Address.
+ ==================== =======================================
+ """
+
+ _PACK_STR = '!BHB'
+ _MIN_LEN = struct.calcsize(_PACK_STR)
+ _CHASSIS_ID_LENGTH_LEN = 1
+ _CHASSIS_ID_SUBTYPE_LEN = 1
+ _MA_DOMAIN_LENGTH_LEN = 1
+ _MA_LENGTH_LEN = 1
+
+ # Chassis ID subtype enumeration
+ _CHASSIS_ID_CHASSIS_COMPONENT = 1
+ _CHASSIS_ID_INTERFACE_ALIAS = 2
+ _CHASSIS_ID_PORT_COMPONENT = 3
+ _CHASSIS_ID_MAC_ADDRESS = 4
+ _CHASSIS_ID_NETWORK_ADDRESS = 5
+ _CHASSIS_ID_INTERFACE_NAME = 6
+ _CHASSIS_ID_LOCALLY_ASSIGNED = 7
+
+ def __init__(self,
+ length=0,
+ chassis_id_length=0,
+ chassis_id_subtype=_CHASSIS_ID_MAC_ADDRESS,
+ chassis_id="",
+ ma_domain_length=0,
+ ma_domain="",
+ ma_length=0,
+ ma=""
+ ):
+ super(sender_id_tlv, self).__init__(length)
+ self._type = CFM_SENDER_ID_TLV
+ self.chassis_id_length = chassis_id_length
+ assert chassis_id_subtype in [
+ self._CHASSIS_ID_CHASSIS_COMPONENT,
+ self._CHASSIS_ID_INTERFACE_ALIAS,
+ self._CHASSIS_ID_PORT_COMPONENT,
+ self._CHASSIS_ID_MAC_ADDRESS,
+ self._CHASSIS_ID_NETWORK_ADDRESS,
+ self._CHASSIS_ID_INTERFACE_NAME,
+ self._CHASSIS_ID_LOCALLY_ASSIGNED]
+ self.chassis_id_subtype = chassis_id_subtype
+ self.chassis_id = chassis_id
+ self.ma_domain_length = ma_domain_length
+ self.ma_domain = ma_domain
+ self.ma_length = ma_length
+ self.ma = ma
+
+ @classmethod
+ def parser(cls, buf):
+ (type_, length, chassis_id_length) = struct.unpack_from(cls._PACK_STR,
+ buf)
+ chassis_id_subtype = 4
+ chassis_id = ""
+ ma_domain_length = 0
+ ma_domain = ""
+ ma_length = 0
+ ma = ""
+ offset = cls._MIN_LEN
+ if chassis_id_length != 0:
+ (chassis_id_subtype, ) = struct.unpack_from("!B", buf, offset)
+ offset += cls._CHASSIS_ID_SUBTYPE_LEN
+ form = "%ds" % chassis_id_length
+ (chassis_id,) = struct.unpack_from(form, buf, offset)
+ offset += chassis_id_length
+ if length + (cls._TYPE_LEN + cls._LENGTH_LEN) > offset:
+ (ma_domain_length, ) = struct.unpack_from("!B", buf, offset)
+ offset += cls._MA_DOMAIN_LENGTH_LEN
+ form = "%ds" % ma_domain_length
+ (ma_domain, ) = struct.unpack_from(form, buf, offset)
+ offset += ma_domain_length
+ if length + (cls._TYPE_LEN + cls._LENGTH_LEN) > offset:
+ (ma_length, ) = struct.unpack_from("!B", buf, offset)
+ offset += cls._MA_LENGTH_LEN
+ form = "%ds" % ma_length
+ (ma, ) = struct.unpack_from(form, buf, offset)
+ return cls(length, chassis_id_length, chassis_id_subtype,
+ chassis_id, ma_domain_length, ma_domain, ma_length, ma)
+
+ def serialize(self):
+ # calculate length when it contains 0
+ if self.chassis_id_length == 0:
+ self.chassis_id_length = len(self.chassis_id)
+ if self.ma_domain_length == 0:
+ self.ma_domain_length = len(self.ma_domain)
+ if self.ma_length == 0:
+ self.ma_length = len(self.ma)
+ if self.length == 0:
+ self.length += self._CHASSIS_ID_LENGTH_LEN
+ if self.chassis_id_length != 0:
+ self.length += (self._CHASSIS_ID_SUBTYPE_LEN +
+ self.chassis_id_length)
+ if self.chassis_id_length != 0 or self.ma_domain_length != 0:
+ self.length += self._MA_DOMAIN_LENGTH_LEN
+ if self.ma_domain_length != 0:
+ self.length += (self.ma_domain_length +
+ self._MA_LENGTH_LEN + self.ma_length)
+ # start serialize
+ buf = struct.pack(self._PACK_STR,
+ self._type,
+ self.length,
+ self.chassis_id_length
+ )
+ buf = bytearray(buf)
+ # Chassis ID Subtype and Chassis ID present
+ # if the Chassis ID Length field contains not 0.
+ if self.chassis_id_length != 0:
+ buf.extend(struct.pack("!B", self.chassis_id_subtype))
+ form = "%ds" % self.chassis_id_length
+ buf.extend(struct.pack(form, self.chassis_id))
+ # Management Address Domain Length present
+ # if the Chassis ID Length field or Management Address Length field
+ # contains not 0.
+ if self.chassis_id_length != 0 or self.ma_domain_length != 0:
+ buf.extend(struct.pack("!B", self.ma_domain_length))
+ # Management Address Domain present
+ # Management Address Domain Length field contains not 0.
+ if self.ma_domain_length != 0:
+ form = "%ds" % self.ma_domain_length
+ buf.extend(struct.pack(form, self.ma_domain))
+ buf.extend(struct.pack("!B", self.ma_length))
+ # Management Address present
+ # Management Address Length field contains not 0.
+ if self.ma_length != 0:
+ form = "%ds" % self.ma_length
+ buf.extend(struct.pack(form, self.ma))
+ return buf
+
+
+@operation.register_tlv_types(CFM_PORT_STATUS_TLV)
+class port_status_tlv(tlv):
+
+ """CFM (IEEE Std 802.1ag-2007) Port Status TLV encoder/decoder class.
+
+ This is used with ryu.lib.packet.cfm.cfm.
+
+ An instance has the following attributes at least.
+ Most of them are same to the on-wire counterparts but in host byte order.
+ __init__ takes the corresponding args in this order.
+
+ .. tabularcolumns:: |l|L|
+
+ ==================== =======================================
+ Attribute Description
+ ==================== =======================================
+ length Length of Value field.
+ (0 means automatically-calculate when encoding.)
+ port_status Port Status.The default is 1 (psUp)
+ ==================== =======================================
+ """
+
+ _PACK_STR = '!BHB'
+ _MIN_LEN = struct.calcsize(_PACK_STR)
+
+ # Port Status TLV values
+ _PS_BLOCKED = 1
+ _PS_UP = 2
+
+ def __init__(self, length=0, port_status=_PS_UP):
+ super(port_status_tlv, self).__init__(length)
+ self._type = CFM_PORT_STATUS_TLV
+ assert port_status in [self._PS_BLOCKED, self._PS_UP]
+ self.port_status = port_status
+
+ @classmethod
+ def parser(cls, buf):
+ (type_, length,
+ port_status) = struct.unpack_from(cls._PACK_STR, buf)
+ return cls(length, port_status)
+
+ def serialize(self):
+ # calculate length when it contains 0
+ if self.length == 0:
+ self.length = 1
+ # start serialize
+ buf = struct.pack(self._PACK_STR,
+ self._type,
+ self.length,
+ self.port_status)
+ return bytearray(buf)
+
+
+@operation.register_tlv_types(CFM_DATA_TLV)
+class data_tlv(tlv):
+
+ """CFM (IEEE Std 802.1ag-2007) Data TLV encoder/decoder class.
+
+ This is used with ryu.lib.packet.cfm.cfm.
+
+ An instance has the following attributes at least.
+ Most of them are same to the on-wire counterparts but in host byte order.
+ __init__ takes the corresponding args in this order.
+
+ .. tabularcolumns:: |l|L|
+
+ ============== =======================================
+ Attribute Description
+ ============== =======================================
+ length Length of Value field.
+ (0 means automatically-calculate when encoding)
+ data_value Bit pattern of any of n octets.(n = length)
+ ============== =======================================
+ """
+
+ _PACK_STR = '!BH'
+ _MIN_LEN = struct.calcsize(_PACK_STR)
+
+ def __init__(self, length=0, data_value=""
+ ):
+ super(data_tlv, self).__init__(length)
+ self._type = CFM_DATA_TLV
+ self.data_value = data_value
+
+ @classmethod
+ def parser(cls, buf):
+ (type_, length) = struct.unpack_from(cls._PACK_STR, buf)
+ form = "%ds" % length
+ (data_value, ) = struct.unpack_from(form, buf, cls._MIN_LEN)
+ return cls(length, data_value)
+
+ def serialize(self):
+ # calculate length when it contains 0
+ if self.length == 0:
+ self.length = len(self.data_value)
+ # start serialize
+ buf = struct.pack(self._PACK_STR,
+ self._type,
+ self.length)
+ buf = bytearray(buf)
+ form = "%ds" % self.length
+ buf.extend(struct.pack(form, self.data_value))
+ return buf
+
+
+@operation.register_tlv_types(CFM_INTERFACE_STATUS_TLV)
+class interface_status_tlv(tlv):
+
+ """CFM (IEEE Std 802.1ag-2007) Interface Status TLV encoder/decoder class.
+
+ This is used with ryu.lib.packet.cfm.cfm.
+
+ An instance has the following attributes at least.
+ Most of them are same to the on-wire counterparts but in host byte order.
+ __init__ takes the corresponding args in this order.
+
+ .. tabularcolumns:: |l|L|
+
+ ==================== =======================================
+ Attribute Description
+ ==================== =======================================
+ length Length of Value field.
+ (0 means automatically-calculate when encoding.)
+ interface_status Interface Status.The default is 1 (isUp)
+ ==================== =======================================
+ """
+
+ _PACK_STR = '!BHB'
+ _MIN_LEN = struct.calcsize(_PACK_STR)
+
+ # Interface Status TLV values
+ _IS_UP = 1
+ _IS_DOWN = 2
+ _IS_TESTING = 3
+ _IS_UNKNOWN = 4
+ _IS_DORMANT = 5
+ _IS_NOT_PRESENT = 6
+ _IS_LOWER_LAYER_DOWN = 7
+
+ def __init__(self, length=0, interface_status=_IS_UP):
+ super(interface_status_tlv, self).__init__(length)
+ self._type = CFM_INTERFACE_STATUS_TLV
+ assert interface_status in [
+ self._IS_UP, self._IS_DOWN, self._IS_TESTING,
+ self._IS_UNKNOWN, self._IS_DORMANT, self._IS_NOT_PRESENT,
+ self._IS_LOWER_LAYER_DOWN]
+ self.interface_status = interface_status
+
+ @classmethod
+ def parser(cls, buf):
+ (type_, length,
+ interface_status) = struct.unpack_from(cls._PACK_STR, buf)
+ return cls(length, interface_status)
+
+ def serialize(self):
+ # calculate length when it contains 0
+ if self.length == 0:
+ self.length = 1
+ # start serialize
+ buf = struct.pack(self._PACK_STR,
+ self._type,
+ self.length,
+ self.interface_status)
+ return bytearray(buf)
+
+
+@operation.register_tlv_types(CFM_LTM_EGRESS_IDENTIFIER_TLV)
+class ltm_egress_identifier_tlv(tlv):
+
+ """CFM (IEEE Std 802.1ag-2007) LTM EGRESS TLV encoder/decoder class.
+
+ This is used with ryu.lib.packet.cfm.cfm.
+
+ An instance has the following attributes at least.
+ Most of them are same to the on-wire counterparts but in host byte order.
+ __init__ takes the corresponding args in this order.
+
+ .. tabularcolumns:: |l|L|
+
+ ============== =======================================
+ Attribute Description
+ ============== =======================================
+ length Length of Value field.
+ (0 means automatically-calculate when encoding.)
+ egress_id_ui Egress Identifier of Unique ID.
+ egress_id_mac Egress Identifier of MAC address.
+ ============== =======================================
+ """
+
+ _PACK_STR = '!BHH6s'
+ _MIN_LEN = struct.calcsize(_PACK_STR)
+
+ def __init__(self,
+ length=0,
+ egress_id_ui=0,
+ egress_id_mac='00:00:00:00:00:00'
+ ):
+ super(ltm_egress_identifier_tlv, self).__init__(length)
+ self._type = CFM_LTM_EGRESS_IDENTIFIER_TLV
+ self.egress_id_ui = egress_id_ui
+ self.egress_id_mac = egress_id_mac
+
+ @classmethod
+ def parser(cls, buf):
+ (type_, length, egress_id_ui,
+ egress_id_mac) = struct.unpack_from(cls._PACK_STR, buf)
+ return cls(length, egress_id_ui,
+ addrconv.mac.bin_to_text(egress_id_mac))
+
+ def serialize(self):
+ # calculate length when it contains 0
+ if self.length == 0:
+ self.length = 8
+ # start serialize
+ buf = struct.pack(self._PACK_STR,
+ self._type,
+ self.length,
+ self.egress_id_ui,
+ addrconv.mac.text_to_bin(self.egress_id_mac)
+ )
+ return bytearray(buf)
+
+
+@operation.register_tlv_types(CFM_LTR_EGRESS_IDENTIFIER_TLV)
+class ltr_egress_identifier_tlv(tlv):
+
+ """CFM (IEEE Std 802.1ag-2007) LTR EGRESS TLV encoder/decoder class.
+
+ This is used with ryu.lib.packet.cfm.cfm.
+
+ An instance has the following attributes at least.
+ Most of them are same to the on-wire counterparts but in host byte order.
+ __init__ takes the corresponding args in this order.
+
+ .. tabularcolumns:: |l|L|
+
+ ==================== =======================================
+ Attribute Description
+ ==================== =======================================
+ length Length of Value field.
+ (0 means automatically-calculate when encoding.)
+ last_egress_id_ui Last Egress Identifier of Unique ID.
+ last_egress_id_mac Last Egress Identifier of MAC address.
+ next_egress_id_ui Next Egress Identifier of Unique ID.
+ next_egress_id_mac Next Egress Identifier of MAC address.
+ ==================== =======================================
+ """
+
+ _PACK_STR = '!BHH6sH6s'
+ _MIN_LEN = struct.calcsize(_PACK_STR)
+
+ def __init__(self,
+ length=0,
+ last_egress_id_ui=0,
+ last_egress_id_mac='00:00:00:00:00:00',
+ next_egress_id_ui=0,
+ next_egress_id_mac='00:00:00:00:00:00'
+ ):
+ super(ltr_egress_identifier_tlv, self).__init__(length)
+ self._type = CFM_LTR_EGRESS_IDENTIFIER_TLV
+ self.last_egress_id_ui = last_egress_id_ui
+ self.last_egress_id_mac = last_egress_id_mac
+ self.next_egress_id_ui = next_egress_id_ui
+ self.next_egress_id_mac = next_egress_id_mac
+
+ @classmethod
+ def parser(cls, buf):
+ (type_, length,
+ last_egress_id_ui, last_egress_id_mac,
+ next_egress_id_ui, next_egress_id_mac
+ ) = struct.unpack_from(cls._PACK_STR, buf)
+ return cls(length,
+ last_egress_id_ui,
+ addrconv.mac.bin_to_text(last_egress_id_mac),
+ next_egress_id_ui,
+ addrconv.mac.bin_to_text(next_egress_id_mac))
+
+ def serialize(self):
+ # calculate length when it contains 0
+ if self.length == 0:
+ self.length = 16
+ # start serialize
+ buf = struct.pack(self._PACK_STR,
+ self._type,
+ self.length,
+ self.last_egress_id_ui,
+ addrconv.mac.text_to_bin(self.last_egress_id_mac),
+ self.next_egress_id_ui,
+ addrconv.mac.text_to_bin(self.next_egress_id_mac)
+ )
+ return bytearray(buf)
+
+
+@operation.register_tlv_types(CFM_ORGANIZATION_SPECIFIC_TLV)
+class organization_specific_tlv(tlv):
+
+ """CFM (IEEE Std 802.1ag-2007) Organization Specific TLV
+ encoder/decoder class.
+
+ This is used with ryu.lib.packet.cfm.cfm.
+
+ An instance has the following attributes at least.
+ Most of them are same to the on-wire counterparts but in host byte order.
+ __init__ takes the corresponding args in this order.
+
+ .. tabularcolumns:: |l|L|
+
+ ============== =======================================
+ Attribute Description
+ ============== =======================================
+ length Length of Value field.
+ (0 means automatically-calculate when encoding.)
+ oui Organizationally Unique Identifier.
+ subtype Subtype.
+ value Value.(optional)
+ ============== =======================================
+ """
+
+ _PACK_STR = '!BH3sB'
+ _MIN_LEN = struct.calcsize(_PACK_STR)
+ _OUI_AND_SUBTYPE_LEN = 4
+
+ def __init__(self,
+ length=0,
+ oui="\x00\x00\x00",
+ subtype=0,
+ value=""
+ ):
+ super(organization_specific_tlv, self).__init__(length)
+ self._type = CFM_ORGANIZATION_SPECIFIC_TLV
+ self.oui = oui
+ self.subtype = subtype
+ self.value = value
+
+ @classmethod
+ def parser(cls, buf):
+ (type_, length, oui, subtype) = struct.unpack_from(cls._PACK_STR, buf)
+ value = ""
+ if length > cls._OUI_AND_SUBTYPE_LEN:
+ form = "%ds" % (length - cls._OUI_AND_SUBTYPE_LEN)
+ (value,) = struct.unpack_from(form, buf, cls._MIN_LEN)
+ return cls(length, oui, subtype, value)
+
+ def serialize(self):
+ # calculate length when it contains 0
+ if self.length == 0:
+ self.length = len(self.value) + self._OUI_AND_SUBTYPE_LEN
+ # start serialize
+ buf = struct.pack(self._PACK_STR,
+ self._type,
+ self.length,
+ self.oui,
+ self.subtype,
+ )
+ buf = bytearray(buf)
+ form = "%ds" % (self.length - self._OUI_AND_SUBTYPE_LEN)
+ buf.extend(struct.pack(form, self.value))
+ return buf
+
+
+class reply_tlv(tlv):
+
+ _PACK_STR = '!BHB6s'
+ _MIN_LEN = struct.calcsize(_PACK_STR)
+ _MIN_VALUE_LEN = _MIN_LEN - struct.calcsize('!BH')
+ _PORT_ID_LENGTH_LEN = 1
+ _PORT_ID_SUBTYPE_LEN = 1
+
+ def __init__(self, length, action, mac_address, port_id_length,
+ port_id_subtype, port_id):
+ super(reply_tlv, self).__init__(length)
+ self.action = action
+ self.mac_address = mac_address
+ self.port_id_length = port_id_length
+ self.port_id_subtype = port_id_subtype
+ self.port_id = port_id
+
+ @classmethod
+ def parser(cls, buf):
+ (type_, length, action,
+ mac_address) = struct.unpack_from(cls._PACK_STR, buf)
+ port_id_length = 0
+ port_id_subtype = 0
+ port_id = ""
+ if length > cls._MIN_VALUE_LEN:
+ (port_id_length,
+ port_id_subtype) = struct.unpack_from('!2B', buf, cls._MIN_LEN)
+ form = "%ds" % port_id_length
+ (port_id,) = struct.unpack_from(form, buf,
+ cls._MIN_LEN +
+ cls._PORT_ID_LENGTH_LEN +
+ cls._PORT_ID_SUBTYPE_LEN)
+ return cls(length, action,
+ addrconv.mac.bin_to_text(mac_address),
+ port_id_length, port_id_subtype, port_id)
+
+ def serialize(self):
+ # calculate length when it contains 0
+ if self.port_id_length == 0:
+ self.port_id_length = len(self.port_id)
+ if self.length == 0:
+ self.length = self._MIN_VALUE_LEN
+ if self.port_id_length != 0:
+ self.length += (self.port_id_length +
+ self._PORT_ID_LENGTH_LEN +
+ self._PORT_ID_SUBTYPE_LEN)
+ # start serialize
+ buf = struct.pack(self._PACK_STR,
+ self._type,
+ self.length,
+ self.action,
+ addrconv.mac.text_to_bin(self.mac_address),
+ )
+ buf = bytearray(buf)
+ if self.port_id_length != 0:
+ buf.extend(struct.pack("!BB",
+ self.port_id_length,
+ self.port_id_subtype))
+ form = "%ds" % self.port_id_length
+ buf.extend(struct.pack(form, self.port_id))
+ return buf
+
+
+@operation.register_tlv_types(CFM_REPLY_INGRESS_TLV)
+class reply_ingress_tlv(reply_tlv):
+
+ """CFM (IEEE Std 802.1ag-2007) Reply Ingress TLV encoder/decoder class.
+
+ This is used with ryu.lib.packet.cfm.cfm.
+
+ An instance has the following attributes at least.
+ Most of them are same to the on-wire counterparts but in host byte order.
+ __init__ takes the corresponding args in this order.
+
+ .. tabularcolumns:: |l|L|
+
+ ================= =======================================
+ Attribute Description
+ ================= =======================================
+ length Length of Value field.
+ (0 means automatically-calculate when encoding.)
+ action Ingress Action.The default is 1 (IngOK)
+ mac_address Ingress MAC Address.
+ port_id_length Ingress PortID Length.
+ (0 means automatically-calculate when encoding.)
+ port_id_subtype Ingress PortID Subtype.
+ port_id Ingress PortID.
+ ================= =======================================
+ """
+
+ # Ingress Action field values
+ _ING_OK = 1
+ _ING_DOWN = 2
+ _ING_BLOCKED = 3
+ _ING_VID = 4
+
+ def __init__(self,
+ length=0,
+ action=_ING_OK,
+ mac_address='00:00:00:00:00:00',
+ port_id_length=0,
+ port_id_subtype=0,
+ port_id=""
+ ):
+ super(reply_ingress_tlv, self).__init__(length, action,
+ mac_address, port_id_length,
+ port_id_subtype, port_id)
+ assert action in [self._ING_OK, self._ING_DOWN,
+ self._ING_BLOCKED, self._ING_VID]
+ self._type = CFM_REPLY_INGRESS_TLV
+
+
+@operation.register_tlv_types(CFM_REPLY_EGRESS_TLV)
+class reply_egress_tlv(reply_tlv):
+
+ """CFM (IEEE Std 802.1ag-2007) Reply Egress TLV encoder/decoder class.
+
+ This is used with ryu.lib.packet.cfm.cfm.
+
+ An instance has the following attributes at least.
+ Most of them are same to the on-wire counterparts but in host byte order.
+ __init__ takes the corresponding args in this order.
+
+ .. tabularcolumns:: |l|L|
+
+ ================= =======================================
+ Attribute Description
+ ================= =======================================
+ length Length of Value field.
+ (0 means automatically-calculate when encoding.)
+ action Egress Action.The default is 1 (EgrOK)
+ mac_address Egress MAC Address.
+ port_id_length Egress PortID Length.
+ (0 means automatically-calculate when encoding.)
+ port_id_subtype Egress PortID Subtype.
+ port_id Egress PortID.
+ ================= =======================================
+ """
+
+ # Egress Action field values
+ _EGR_OK = 1
+ _EGR_DOWN = 2
+ _EGR_BLOCKED = 3
+ _EGR_VID = 4
+
+ def __init__(self,
+ length=0,
+ action=_EGR_OK,
+ mac_address='00:00:00:00:00:00',
+ port_id_length=0,
+ port_id_subtype=0,
+ port_id=""
+ ):
+ super(reply_egress_tlv, self).__init__(length, action,
+ mac_address, port_id_length,
+ port_id_subtype, port_id)
+ assert action in [self._EGR_OK, self._EGR_DOWN,
+ self._EGR_BLOCKED, self._EGR_VID]
+ self._type = CFM_REPLY_EGRESS_TLV
+
+
+operation.set_classes(operation._TLV_TYPES)
diff --git a/ryu/lib/packet/vlan.py b/ryu/lib/packet/vlan.py
index a4f4332c..e44afce6 100644
--- a/ryu/lib/packet/vlan.py
+++ b/ryu/lib/packet/vlan.py
@@ -24,6 +24,7 @@ from . import lldp
from . import slow
from . import llc
from . import pbb
+from . import cfm
from ryu.ofproto import ether
@@ -121,6 +122,7 @@ vlan.register_packet_type(ipv6.ipv6, ether.ETH_TYPE_IPV6)
vlan.register_packet_type(lldp.lldp, ether.ETH_TYPE_LLDP)
vlan.register_packet_type(slow.slow, ether.ETH_TYPE_SLOW)
vlan.register_packet_type(llc.llc, ether.ETH_TYPE_IEEE802_3)
+vlan.register_packet_type(cfm.cfm, ether.ETH_TYPE_CFM)
svlan.register_packet_type(vlan, ether.ETH_TYPE_8021Q)
svlan.register_packet_type(pbb.itag, ether.ETH_TYPE_8021AH)
diff --git a/ryu/ofproto/ether.py b/ryu/ofproto/ether.py
index f17f7be6..bfa78995 100644
--- a/ryu/ofproto/ether.py
+++ b/ryu/ofproto/ether.py
@@ -24,3 +24,4 @@ ETH_TYPE_8021AD = 0x88a8
ETH_TYPE_LLDP = 0x88cc
ETH_TYPE_8021AH = 0x88e7
ETH_TYPE_IEEE802_3 = 0x05dc
+ETH_TYPE_CFM = 0x8902
diff --git a/ryu/tests/unit/packet/test_cfm.py b/ryu/tests/unit/packet/test_cfm.py
new file mode 100644
index 00000000..49207e92
--- /dev/null
+++ b/ryu/tests/unit/packet/test_cfm.py
@@ -0,0 +1,1760 @@
+# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import unittest
+import logging
+import inspect
+import struct
+
+from nose.tools import *
+from nose.plugins.skip import Skip, SkipTest
+from ryu.lib import addrconv
+from ryu.lib.packet import cfm
+
+LOG = logging.getLogger(__name__)
+
+
+class Test_cfm(unittest.TestCase):
+
+ def setUp(self):
+ self.message = cfm.cc_message()
+ self.ins = cfm.cfm(self.message)
+ data = bytearray()
+ prev = None
+ self.buf = self.ins.serialize(data, prev)
+
+ def setUp_cc_message(self):
+ self.cc_message_md_lv = 1
+ self.cc_message_version = 1
+ self.cc_message_rdi = 1
+ self.cc_message_interval = 1
+ self.cc_message_seq_num = 123
+ self.cc_message_mep_id = 4
+ self.cc_message_md_name_format = 4
+ self.cc_message_md_name_length = 0
+ self.cc_message_md_name = "hoge"
+ self.cc_message_short_ma_name_format = 2
+ self.cc_message_short_ma_name_length = 0
+ self.cc_message_short_ma_name = "pakeratta"
+ self.cc_message_md_name_txfcf = 11
+ self.cc_message_md_name_rxfcb = 22
+ self.cc_message_md_name_txfcb = 33
+ self.cc_message_tlvs = [
+ cfm.sender_id_tlv(),
+ cfm.port_status_tlv(),
+ cfm.data_tlv(),
+ cfm.interface_status_tlv(),
+ cfm.reply_ingress_tlv(),
+ cfm.reply_egress_tlv(),
+ cfm.ltm_egress_identifier_tlv(),
+ cfm.ltr_egress_identifier_tlv(),
+ cfm.organization_specific_tlv(),
+ ]
+ self.message = cfm.cc_message(
+ self.cc_message_md_lv,
+ self.cc_message_version,
+ self.cc_message_rdi,
+ self.cc_message_interval,
+ self.cc_message_seq_num,
+ self.cc_message_mep_id,
+ self.cc_message_md_name_format,
+ self.cc_message_md_name_length,
+ self.cc_message_md_name,
+ self.cc_message_short_ma_name_format,
+ self.cc_message_short_ma_name_length,
+ self.cc_message_short_ma_name,
+ self.cc_message_tlvs
+ )
+ self.ins = cfm.cfm(self.message)
+ data = bytearray()
+ prev = None
+ self.buf = self.ins.serialize(data, prev)
+
+ def setUp_loopback_message(self):
+ self.loopback_message_md_lv = 1
+ self.loopback_message_version = 1
+ self.loopback_message_transaction_id = 12345
+ self.loopback_message_tlvs = [
+ cfm.sender_id_tlv(),
+ cfm.port_status_tlv(),
+ cfm.data_tlv(),
+ cfm.interface_status_tlv(),
+ cfm.reply_ingress_tlv(),
+ cfm.reply_egress_tlv(),
+ cfm.ltm_egress_identifier_tlv(),
+ cfm.ltr_egress_identifier_tlv(),
+ cfm.organization_specific_tlv(),
+ ]
+ self.message = cfm.loopback_message(
+ self.loopback_message_md_lv,
+ self.loopback_message_version,
+ self.loopback_message_transaction_id,
+ self.loopback_message_tlvs)
+ self.ins = cfm.cfm(self.message)
+ data = bytearray()
+ prev = None
+ self.buf = self.ins.serialize(data, prev)
+
+ def setUp_loopback_reply(self):
+ self.loopback_reply_md_lv = 1
+ self.loopback_reply_version = 1
+ self.loopback_reply_transaction_id = 12345
+ self.loopback_reply_tlvs = [
+ cfm.sender_id_tlv(),
+ cfm.port_status_tlv(),
+ cfm.data_tlv(),
+ cfm.interface_status_tlv(),
+ cfm.reply_ingress_tlv(),
+ cfm.reply_egress_tlv(),
+ cfm.ltm_egress_identifier_tlv(),
+ cfm.ltr_egress_identifier_tlv(),
+ cfm.organization_specific_tlv(),
+ ]
+ self.message = cfm.loopback_reply(
+ self.loopback_reply_md_lv,
+ self.loopback_reply_version,
+ self.loopback_reply_transaction_id,
+ self.loopback_reply_tlvs)
+ self.ins = cfm.cfm(self.message)
+ data = bytearray()
+ prev = None
+ self.buf = self.ins.serialize(data, prev)
+
+ def setUp_link_trace_message(self):
+ self.link_trace_message_md_lv = 1
+ self.link_trace_message_version = 1
+ self.link_trace_message_use_fdb_only = 1
+ self.link_trace_message_transaction_id = 12345
+ self.link_trace_message_ttl = 123
+ self.link_trace_message_ltm_orig_addr = '11:22:33:44:55:66'
+ self.link_trace_message_ltm_targ_addr = '77:88:99:aa:cc:dd'
+ self.link_trace_message_tlvs = [
+ cfm.sender_id_tlv(),
+ cfm.port_status_tlv(),
+ cfm.data_tlv(),
+ cfm.interface_status_tlv(),
+ cfm.reply_ingress_tlv(),
+ cfm.reply_egress_tlv(),
+ cfm.ltm_egress_identifier_tlv(),
+ cfm.ltr_egress_identifier_tlv(),
+ cfm.organization_specific_tlv(),
+ ]
+ self.message = cfm.link_trace_message(
+ self.link_trace_message_md_lv,
+ self.link_trace_message_version,
+ self.link_trace_message_use_fdb_only,
+ self.link_trace_message_transaction_id,
+ self.link_trace_message_ttl,
+ self.link_trace_message_ltm_orig_addr,
+ self.link_trace_message_ltm_targ_addr,
+ self.link_trace_message_tlvs)
+ self.ins = cfm.cfm(self.message)
+ data = bytearray()
+ prev = None
+ self.buf = self.ins.serialize(data, prev)
+
+ def setUp_link_trace_reply(self):
+ self.link_trace_reply_md_lv = 1
+ self.link_trace_reply_version = 1
+ self.link_trace_reply_use_fdb_only = 1
+ self.link_trace_reply_fwd_yes = 0
+ self.link_trace_reply_terminal_mep = 1
+ self.link_trace_reply_transaction_id = 5432
+ self.link_trace_reply_ttl = 123
+ self.link_trace_reply_relay_action = 3
+ self.link_trace_reply_tlvs = [
+ cfm.sender_id_tlv(),
+ cfm.port_status_tlv(),
+ cfm.data_tlv(),
+ cfm.interface_status_tlv(),
+ cfm.reply_ingress_tlv(),
+ cfm.reply_egress_tlv(),
+ cfm.ltm_egress_identifier_tlv(),
+ cfm.ltr_egress_identifier_tlv(),
+ cfm.organization_specific_tlv(),
+ ]
+ self.message = cfm.link_trace_reply(
+ self.link_trace_reply_md_lv,
+ self.link_trace_reply_version,
+ self.link_trace_reply_use_fdb_only,
+ self.link_trace_reply_fwd_yes,
+ self.link_trace_reply_terminal_mep,
+ self.link_trace_reply_transaction_id,
+ self.link_trace_reply_ttl,
+ self.link_trace_reply_relay_action,
+ self.link_trace_reply_tlvs)
+ self.ins = cfm.cfm(self.message)
+ data = bytearray()
+ prev = None
+ self.buf = self.ins.serialize(data, prev)
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(str(self.message), str(self.ins.op))
+
+ def test_init_cc_message(self):
+ self.setUp_cc_message()
+ self.test_init()
+
+ def test_init_loopback_message(self):
+ self.setUp_loopback_message()
+ self.test_init()
+
+ def test_init_loopback_reply(self):
+ self.setUp_loopback_reply()
+ self.test_init()
+
+ def test_init_link_trace_message(self):
+ self.setUp_link_trace_message()
+ self.test_init()
+
+ def test_init_link_trace_reply(self):
+ self.setUp_link_trace_reply()
+ self.test_init()
+
+ def test_parser(self):
+ _res = self.ins.parser(str(self.buf))
+
+ if type(_res) is tuple:
+ res = _res[0]
+ else:
+ res = _res
+ eq_(str(self.message), str(res.op))
+
+ def test_parser_with_cc_message(self):
+ self.setUp_cc_message()
+ self.test_parser()
+
+ def test_parser_with_loopback_message(self):
+ self.setUp_loopback_message()
+ self.test_parser()
+
+ def test_parser_with_loopback_reply(self):
+ self.setUp_loopback_reply()
+ self.test_parser()
+
+ def test_parser_with_link_trace_message(self):
+ self.setUp_link_trace_message()
+ self.test_parser()
+
+ def test_parser_with_link_trace_reply(self):
+ self.setUp_link_trace_reply()
+ self.test_parser()
+
+ def test_serialize(self):
+ pass
+
+ def test_serialize_with_cc_message(self):
+ self.setUp_cc_message()
+ self.test_serialize()
+ data = bytearray()
+ prev = None
+ buf = self.ins.serialize(data, prev)
+ cc_message = cfm.cc_message.parser(str(buf))
+ eq_(repr(self.message), repr(cc_message))
+
+ def test_serialize_with_loopback_message(self):
+ self.setUp_loopback_message()
+ self.test_serialize()
+ data = bytearray()
+ prev = None
+ buf = self.ins.serialize(data, prev)
+ loopback_message = cfm.loopback_message.parser(str(buf))
+ eq_(repr(self.message), repr(loopback_message))
+
+ def test_serialize_with_loopback_reply(self):
+ self.setUp_loopback_reply()
+ self.test_serialize()
+ data = bytearray()
+ prev = None
+ buf = self.ins.serialize(data, prev)
+ loopback_reply = cfm.loopback_reply.parser(str(buf))
+ eq_(repr(self.message), repr(loopback_reply))
+
+ def test_serialize_with_link_trace_message(self):
+ self.setUp_link_trace_message()
+ self.test_serialize()
+ data = bytearray()
+ prev = None
+ buf = self.ins.serialize(data, prev)
+ link_trace_message = cfm.link_trace_message.parser(str(buf))
+ eq_(repr(self.message), repr(link_trace_message))
+
+ def test_serialize_with_link_trace_reply(self):
+ self.setUp_link_trace_reply()
+ self.test_serialize()
+ data = bytearray()
+ prev = None
+ buf = self.ins.serialize(data, prev)
+ link_trace_reply = cfm.link_trace_reply.parser(str(buf))
+ eq_(repr(self.message), repr(link_trace_reply))
+
+ def test_to_string(self):
+ cfm_values = {'op': self.message}
+ _cfm_str = ','.join(['%s=%s' % (k, cfm_values[k])
+ for k, v in inspect.getmembers(self.ins)
+ if k in cfm_values])
+ cfm_str = '%s(%s)' % (cfm.cfm.__name__, _cfm_str)
+ eq_(str(self.ins), cfm_str)
+ eq_(repr(self.ins), cfm_str)
+
+ def test_to_string_cc_message(self):
+ self.setUp_cc_message()
+ self.test_to_string()
+
+ def test_to_string_loopback_message(self):
+ self.setUp_loopback_message()
+ self.test_to_string()
+
+ def test_to_string_loopback_reply(self):
+ self.setUp_loopback_reply()
+ self.test_to_string()
+
+ def test_to_string_link_trace_message(self):
+ self.setUp_link_trace_message()
+ self.test_to_string()
+
+ def test_to_string_link_trace_reply(self):
+ self.setUp_link_trace_reply()
+ self.test_to_string()
+
+ def test_len(self):
+ pass
+
+ def test_len_cc_message(self):
+ self.setUp_cc_message()
+ eq_(len(self.ins), 0 + len(self.message))
+
+ def test_len_loopback_message(self):
+ self.setUp_loopback_message()
+ eq_(len(self.ins), 0 + len(self.message))
+
+ def test_len_loopback_reply(self):
+ self.setUp_loopback_reply()
+ eq_(len(self.ins), 0 + len(self.message))
+
+ def test_len_link_trace_message(self):
+ self.setUp_link_trace_message()
+ eq_(len(self.ins), 0 + len(self.message))
+
+ def test_len_link_trace_reply(self):
+ self.setUp_link_trace_reply()
+ eq_(len(self.ins), 0 + len(self.message))
+
+ def test_default_args(self):
+ pass
+
+ def test_json(self):
+ jsondict = self.ins.to_jsondict()
+ ins = cfm.cfm.from_jsondict(jsondict['cfm'])
+ eq_(str(self.ins), str(ins))
+
+ def test_json_with_cc_message(self):
+ self.setUp_cc_message()
+ self.test_json()
+
+ def test_json_with_loopback_message(self):
+ self.setUp_loopback_message()
+ self.test_json()
+
+ def test_json_with_loopback_reply(self):
+ self.setUp_loopback_reply()
+ self.test_json()
+
+ def test_json_with_link_trace_message(self):
+ self.setUp_link_trace_message()
+ self.test_json()
+
+ def test_json_with_link_trace_reply(self):
+ self.setUp_link_trace_reply()
+ self.test_json()
+
+
+class Test_cc_message(unittest.TestCase):
+
+ def setUp(self):
+ self.md_lv = 1
+ self.version = 1
+ self.opcode = cfm.CFM_CC_MESSAGE
+ self.rdi = 1
+ self.interval = 5
+ self.first_tlv_offset = cfm.cc_message._TLV_OFFSET
+ self.seq_num = 2
+ self.mep_id = 2
+ self.md_name_format = cfm.cc_message._MD_FMT_CHARACTER_STRING
+ self.md_name_length = 3
+ self.md_name = "foo"
+ self.short_ma_name_format = 2
+ self.short_ma_name_length = 8
+ self.short_ma_name = "hogehoge"
+ self.tlvs = [
+ ]
+ self.end_tlv = 0
+ self.ins = cfm.cc_message(
+ self.md_lv,
+ self.version,
+ self.rdi,
+ self.interval,
+ self.seq_num,
+ self.mep_id,
+ self.md_name_format,
+ self.md_name_length,
+ self.md_name,
+ self.short_ma_name_format,
+ self.short_ma_name_length,
+ self.short_ma_name,
+ self.tlvs
+ )
+
+ self.form = '!4BIH2B3s2B8s33x12x4xB'
+ self.buf = struct.pack(
+ self.form,
+ (self.md_lv << 5) | self.version,
+ self.opcode,
+ (self.rdi << 7) | self.interval,
+ self.first_tlv_offset,
+ self.seq_num,
+ self.mep_id,
+ self.md_name_format,
+ self.md_name_length,
+ self.md_name,
+ self.short_ma_name_format,
+ self.short_ma_name_length,
+ self.short_ma_name,
+ self.end_tlv
+ )
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.md_lv, self.ins.md_lv)
+ eq_(self.version, self.ins.version)
+ eq_(self.rdi, self.ins.rdi)
+ eq_(self.interval, self.ins.interval)
+ eq_(self.seq_num, self.ins.seq_num)
+ eq_(self.mep_id, self.ins.mep_id)
+ eq_(self.md_name_format, self.ins.md_name_format)
+ eq_(self.md_name_length, self.ins.md_name_length)
+ eq_(self.md_name, self.ins.md_name)
+ eq_(self.short_ma_name_format, self.ins.short_ma_name_format)
+ eq_(self.short_ma_name_length, self.ins.short_ma_name_length)
+ eq_(self.short_ma_name, self.ins.short_ma_name)
+ eq_(self.tlvs, self.ins.tlvs)
+
+ def test_parser(self):
+ _res = cfm.cc_message.parser(self.buf)
+ if type(_res) is tuple:
+ res = _res[0]
+ else:
+ res = _res
+ eq_(self.md_lv, res.md_lv)
+ eq_(self.version, res.version)
+ eq_(self.rdi, res.rdi)
+ eq_(self.interval, res.interval)
+ eq_(self.seq_num, res.seq_num)
+ eq_(self.mep_id, res.mep_id)
+ eq_(self.md_name_format, res.md_name_format)
+ eq_(self.md_name_length, res.md_name_length)
+ eq_(self.md_name, res.md_name)
+ eq_(self.short_ma_name_format, res.short_ma_name_format)
+ eq_(self.short_ma_name_length, res.short_ma_name_length)
+ eq_(self.short_ma_name, res.short_ma_name)
+ eq_(self.tlvs, res.tlvs)
+
+ def test_parser_with_no_maintenance_domain_name_present(self):
+ form = '!4BIH3B8s37x12x4xB'
+ buf = struct.pack(
+ form,
+ (self.md_lv << 5) | self.version,
+ self.opcode,
+ (self.rdi << 7) | self.interval,
+ self.first_tlv_offset,
+ self.seq_num,
+ self.mep_id,
+ cfm.cc_message._MD_FMT_NO_MD_NAME_PRESENT,
+ self.short_ma_name_format,
+ self.short_ma_name_length,
+ self.short_ma_name,
+ self.end_tlv
+ )
+ _res = cfm.cc_message.parser(buf)
+ if type(_res) is tuple:
+ res = _res[0]
+ else:
+ res = _res
+ eq_(self.md_lv, res.md_lv)
+ eq_(self.version, res.version)
+ eq_(self.rdi, res.rdi)
+ eq_(self.interval, res.interval)
+ eq_(self.seq_num, res.seq_num)
+ eq_(self.mep_id, res.mep_id)
+ eq_(cfm.cc_message._MD_FMT_NO_MD_NAME_PRESENT, res.md_name_format)
+ eq_(self.short_ma_name_format, res.short_ma_name_format)
+ eq_(self.short_ma_name_length, res.short_ma_name_length)
+ eq_(self.short_ma_name, res.short_ma_name)
+ eq_(self.tlvs, res.tlvs)
+
+ def test_serialize(self):
+ buf = self.ins.serialize()
+ res = struct.unpack_from(self.form, str(buf))
+ eq_(self.md_lv, res[0] >> 5)
+ eq_(self.version, res[0] & 0x1f)
+ eq_(self.opcode, res[1])
+ eq_(self.rdi, res[2] >> 7)
+ eq_(self.interval, res[2] & 0x07)
+ eq_(self.first_tlv_offset, res[3])
+ eq_(self.seq_num, res[4])
+ eq_(self.mep_id, res[5])
+ eq_(self.md_name_format, res[6])
+ eq_(self.md_name_length, res[7])
+ eq_(self.md_name, res[8])
+ eq_(self.short_ma_name_format, res[9])
+ eq_(self.short_ma_name_length, res[10])
+ eq_(self.short_ma_name, res[11])
+ eq_(self.end_tlv, res[12])
+
+ def test_serialize_with_md_name_length_zero(self):
+ ins = cfm.cc_message(
+ self.md_lv,
+ self.version,
+ self.rdi,
+ self.interval,
+ self.seq_num,
+ self.mep_id,
+ self.md_name_format,
+ 0,
+ self.md_name,
+ self.short_ma_name_format,
+ 0,
+ self.short_ma_name,
+ self.tlvs
+ )
+ buf = ins.serialize()
+ res = struct.unpack_from(self.form, str(buf))
+ eq_(self.md_lv, res[0] >> 5)
+ eq_(self.version, res[0] & 0x1f)
+ eq_(self.opcode, res[1])
+ eq_(self.rdi, res[2] >> 7)
+ eq_(self.interval, res[2] & 0x07)
+ eq_(self.first_tlv_offset, res[3])
+ eq_(self.seq_num, res[4])
+ eq_(self.mep_id, res[5])
+ eq_(self.md_name_format, res[6])
+ eq_(self.md_name_length, res[7])
+ eq_(self.md_name, res[8])
+ eq_(self.short_ma_name_format, res[9])
+ eq_(self.short_ma_name_length, res[10])
+ eq_(self.short_ma_name, res[11])
+ eq_(self.end_tlv, res[12])
+
+ def test_serialize_with_no_maintenance_domain_name_present(self):
+ form = '!4BIH3B8s37x12x4xB'
+ ins = cfm.cc_message(
+ self.md_lv,
+ self.version,
+ self.rdi,
+ self.interval,
+ self.seq_num,
+ self.mep_id,
+ cfm.cc_message._MD_FMT_NO_MD_NAME_PRESENT,
+ 0,
+ self.md_name,
+ self.short_ma_name_format,
+ 0,
+ self.short_ma_name,
+ self.tlvs
+ )
+ buf = ins.serialize()
+ res = struct.unpack_from(form, str(buf))
+ eq_(self.md_lv, res[0] >> 5)
+ eq_(self.version, res[0] & 0x1f)
+ eq_(self.opcode, res[1])
+ eq_(self.rdi, res[2] >> 7)
+ eq_(self.interval, res[2] & 0x07)
+ eq_(self.first_tlv_offset, res[3])
+ eq_(self.seq_num, res[4])
+ eq_(self.mep_id, res[5])
+ eq_(cfm.cc_message._MD_FMT_NO_MD_NAME_PRESENT, res[6])
+ eq_(self.short_ma_name_format, res[7])
+ eq_(self.short_ma_name_length, res[8])
+ eq_(self.short_ma_name, res[9])
+ eq_(self.end_tlv, res[10])
+
+ def test_len(self):
+ # 75 octet (If tlv does not exist)
+ eq_(75, len(self.ins))
+
+ def test_default_args(self):
+ ins = cfm.cc_message()
+ buf = ins.serialize()
+ res = struct.unpack_from(cfm.cc_message._PACK_STR, str(buf))
+ eq_(res[0] >> 5, 0)
+ eq_(res[0] & 0x1f, 0)
+ eq_(res[1], 1)
+ eq_(res[2] >> 7, 0)
+ eq_(res[2] & 0x07, 4)
+ eq_(res[3], 70)
+ eq_(res[4], 0)
+ eq_(res[5], 1)
+ eq_(res[6], 4)
+
+
+class Test_loopback_message(unittest.TestCase):
+
+ def setUp(self):
+ self.md_lv = 1
+ self.version = 1
+ self.opcode = cfm.CFM_LOOPBACK_MESSAGE
+ self.flags = 0
+ self.first_tlv_offset = cfm.loopback_message._TLV_OFFSET
+ self.transaction_id = 12345
+ self.tlvs = [
+ ]
+
+ self.end_tlv = 0
+ self.ins = cfm.loopback_message(
+ self.md_lv,
+ self.version,
+ self.transaction_id,
+ self.tlvs,
+ )
+ self.form = '!4BIB'
+ self.buf = struct.pack(
+ self.form,
+ (self.md_lv << 5) | self.version,
+ self.opcode,
+ self.flags,
+ self.first_tlv_offset,
+ self.transaction_id,
+ self.end_tlv
+ )
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.md_lv, self.ins.md_lv)
+ eq_(self.version, self.ins.version)
+ eq_(self.transaction_id, self.ins.transaction_id)
+ eq_(self.tlvs, self.ins.tlvs)
+
+ def test_parser(self):
+ _res = cfm.loopback_message.parser(self.buf)
+ if type(_res) is tuple:
+ res = _res[0]
+ else:
+ res = _res
+ eq_(self.md_lv, res.md_lv)
+ eq_(self.version, res.version)
+ eq_(self.transaction_id, res.transaction_id)
+ eq_(self.tlvs, res.tlvs)
+
+ def test_serialize(self):
+ buf = self.ins.serialize()
+ res = struct.unpack_from(self.form, str(buf))
+ eq_(self.md_lv, res[0] >> 5)
+ eq_(self.version, res[0] & 0x1f)
+ eq_(self.opcode, res[1])
+ eq_(self.flags, res[2])
+ eq_(self.first_tlv_offset, res[3])
+ eq_(self.transaction_id, res[4])
+ eq_(self.end_tlv, res[5])
+
+ def test_len(self):
+ # 9 octet (If tlv does not exist)
+ eq_(9, len(self.ins))
+
+ def test_default_args(self):
+ ins = cfm.loopback_message()
+ buf = ins.serialize()
+ res = struct.unpack_from(cfm.loopback_message._PACK_STR,
+ str(buf))
+ eq_(res[0] >> 5, 0)
+ eq_(res[0] & 0x1f, 0)
+ eq_(res[1], 3)
+ eq_(res[2], 0)
+ eq_(res[3], 4)
+ eq_(res[4], 0)
+
+
+class Test_loopback_reply(unittest.TestCase):
+
+ def setUp(self):
+ self.md_lv = 1
+ self.version = 1
+ self.opcode = cfm.CFM_LOOPBACK_REPLY
+ self.flags = 0
+ self.first_tlv_offset = cfm.loopback_reply._TLV_OFFSET
+ self.transaction_id = 12345
+ self.tlvs = [
+ ]
+ self.end_tlv = 0
+ self.ins = cfm.loopback_reply(
+ self.md_lv,
+ self.version,
+ self.transaction_id,
+ self.tlvs,
+ )
+ self.form = '!4BIB'
+ self.buf = struct.pack(
+ self.form,
+ (self.md_lv << 5) | self.version,
+ self.opcode,
+ self.flags,
+ self.first_tlv_offset,
+ self.transaction_id,
+ self.end_tlv
+ )
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.md_lv, self.ins.md_lv)
+ eq_(self.version, self.ins.version)
+ eq_(self.transaction_id, self.ins.transaction_id)
+ eq_(self.tlvs, self.ins.tlvs)
+
+ def test_parser(self):
+ _res = cfm.loopback_reply.parser(self.buf)
+ if type(_res) is tuple:
+ res = _res[0]
+ else:
+ res = _res
+ eq_(self.md_lv, res.md_lv)
+ eq_(self.version, res.version)
+ eq_(self.transaction_id, res.transaction_id)
+ eq_(self.tlvs, res.tlvs)
+
+ def test_serialize(self):
+ buf = self.ins.serialize()
+ res = struct.unpack_from(self.form, str(buf))
+ eq_(self.md_lv, res[0] >> 5)
+ eq_(self.version, res[0] & 0x1f)
+ eq_(self.opcode, res[1])
+ eq_(self.flags, res[2])
+ eq_(self.first_tlv_offset, res[3])
+ eq_(self.transaction_id, res[4])
+ eq_(self.end_tlv, res[5])
+
+ def test_len(self):
+ # 9 octet (If tlv does not exist)
+ eq_(9, len(self.ins))
+
+ def test_default_args(self):
+ ins = cfm.loopback_reply()
+ buf = ins.serialize()
+ res = struct.unpack_from(cfm.loopback_reply._PACK_STR, str(buf))
+ eq_(res[0] >> 5, 0)
+ eq_(res[0] & 0x1f, 0)
+ eq_(res[1], 2)
+ eq_(res[2], 0)
+ eq_(res[3], 4)
+ eq_(res[4], 0)
+
+
+class Test_link_trace_message(unittest.TestCase):
+
+ def setUp(self):
+ self.md_lv = 1
+ self.version = 1
+ self.opcode = cfm.CFM_LINK_TRACE_MESSAGE
+ self.use_fdb_only = 1
+ self.first_tlv_offset = cfm.link_trace_message._TLV_OFFSET
+ self.transaction_id = 12345
+ self.ttl = 55
+ self.ltm_orig_addr = "00:11:22:44:55:66"
+ self.ltm_targ_addr = "ab:cd:ef:23:12:65"
+ self.tlvs = [
+ ]
+
+ self.end_tlv = 0
+ self.ins = cfm.link_trace_message(
+ self.md_lv,
+ self.version,
+ self.use_fdb_only,
+ self.transaction_id,
+ self.ttl,
+ self.ltm_orig_addr,
+ self.ltm_targ_addr,
+ self.tlvs
+ )
+ self.form = '!4BIB6s6sB'
+ self.buf = struct.pack(
+ self.form,
+ (self.md_lv << 5) | self.version,
+ self.opcode,
+ self.use_fdb_only << 7,
+ self.first_tlv_offset,
+ self.transaction_id,
+ self.ttl,
+ addrconv.mac.text_to_bin(self.ltm_orig_addr),
+ addrconv.mac.text_to_bin(self.ltm_targ_addr),
+ self.end_tlv
+ )
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.md_lv, self.ins.md_lv)
+ eq_(self.version, self.ins.version)
+ eq_(self.use_fdb_only, self.ins.use_fdb_only)
+ eq_(self.transaction_id, self.ins.transaction_id)
+ eq_(self.ttl, self.ins.ttl)
+ eq_(self.ltm_orig_addr, self.ins.ltm_orig_addr)
+ eq_(self.ltm_targ_addr, self.ins.ltm_targ_addr)
+ eq_(self.tlvs, self.ins.tlvs)
+
+ def test_parser(self):
+ _res = cfm.link_trace_message.parser(self.buf)
+ if type(_res) is tuple:
+ res = _res[0]
+ else:
+ res = _res
+ eq_(self.md_lv, res.md_lv)
+ eq_(self.version, res.version)
+ eq_(self.use_fdb_only, res.use_fdb_only)
+ eq_(self.transaction_id, res.transaction_id)
+ eq_(self.ttl, res.ttl)
+ eq_(self.ltm_orig_addr, res.ltm_orig_addr)
+ eq_(self.ltm_targ_addr, res.ltm_targ_addr)
+ eq_(self.tlvs, res.tlvs)
+
+ def test_serialize(self):
+ buf = self.ins.serialize()
+ res = struct.unpack_from(self.form, str(buf))
+ eq_(self.md_lv, res[0] >> 5)
+ eq_(self.version, res[0] & 0x1f)
+ eq_(self.opcode, res[1])
+ eq_(self.use_fdb_only, res[2] >> 7)
+ eq_(self.first_tlv_offset, res[3])
+ eq_(self.transaction_id, res[4])
+ eq_(self.ttl, res[5])
+ eq_(addrconv.mac.text_to_bin(self.ltm_orig_addr), res[6])
+ eq_(addrconv.mac.text_to_bin(self.ltm_targ_addr), res[7])
+ eq_(self.end_tlv, res[8])
+
+ def test_len(self):
+ # 22 octet (If tlv does not exist)
+ eq_(22, len(self.ins))
+
+ def test_default_args(self):
+ ins = cfm.link_trace_message()
+ buf = ins.serialize()
+ res = struct.unpack_from(cfm.link_trace_message._PACK_STR, str(buf))
+ eq_(res[0] >> 5, 0)
+ eq_(res[0] & 0x1f, 0)
+ eq_(res[1], 5)
+ eq_(res[2] >> 7, 1)
+ eq_(res[3], 17)
+ eq_(res[4], 0)
+ eq_(res[5], 64)
+ eq_(res[6], addrconv.mac.text_to_bin('00:00:00:00:00:00'))
+ eq_(res[7], addrconv.mac.text_to_bin('00:00:00:00:00:00'))
+
+
+class Test_link_trace_reply(unittest.TestCase):
+
+ def setUp(self):
+ self.md_lv = 1
+ self.version = 1
+ self.opcode = cfm.CFM_LINK_TRACE_REPLY
+ self.use_fdb_only = 1
+ self.fwd_yes = 0
+ self.terminal_mep = 1
+ self.first_tlv_offset = cfm.link_trace_reply._TLV_OFFSET
+ self.transaction_id = 12345
+ self.ttl = 55
+ self.relay_action = 2
+ self.ltm_orig_addr = "00:11:22:aa:bb:cc"
+ self.ltm_targ_addr = "53:45:24:64:ac:ff"
+ self.tlvs = [
+ ]
+ self.end_tlv = 0
+ self.ins = cfm.link_trace_reply(
+ self.md_lv,
+ self.version,
+ self.use_fdb_only,
+ self.fwd_yes,
+ self.terminal_mep,
+ self.transaction_id,
+ self.ttl,
+ self.relay_action,
+ self.tlvs,
+ )
+ self.form = '!4BIBBB'
+ self.buf = struct.pack(
+ self.form,
+ (self.md_lv << 5) | self.version,
+ self.opcode,
+ (self.use_fdb_only << 7) | (self.fwd_yes << 6) |
+ (self.terminal_mep << 5),
+ self.first_tlv_offset,
+ self.transaction_id,
+ self.ttl,
+ self.relay_action,
+ self.end_tlv
+ )
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.md_lv, self.ins.md_lv)
+ eq_(self.version, self.ins.version)
+ eq_(self.use_fdb_only, self.ins.use_fdb_only)
+ eq_(self.fwd_yes, self.ins.fwd_yes)
+ eq_(self.terminal_mep, self.ins.terminal_mep)
+ eq_(self.transaction_id, self.ins.transaction_id)
+ eq_(self.ttl, self.ins.ttl)
+ eq_(self.relay_action, self.ins.relay_action)
+ eq_(self.tlvs, self.ins.tlvs)
+
+ def test_parser(self):
+ _res = cfm.link_trace_reply.parser(self.buf)
+ if type(_res) is tuple:
+ res = _res[0]
+ else:
+ res = _res
+ eq_(self.md_lv, res.md_lv)
+ eq_(self.version, res.version)
+ eq_(self.use_fdb_only, self.ins.use_fdb_only)
+ eq_(self.fwd_yes, self.ins.fwd_yes)
+ eq_(self.terminal_mep, self.ins.terminal_mep)
+ eq_(self.transaction_id, res.transaction_id)
+ eq_(self.ttl, res.ttl)
+ eq_(self.relay_action, res.relay_action)
+ eq_(self.tlvs, res.tlvs)
+
+ def test_serialize(self):
+ buf = self.ins.serialize()
+ res = struct.unpack_from(self.form, str(buf))
+ eq_(self.md_lv, res[0] >> 5)
+ eq_(self.version, res[0] & 0x1f)
+ eq_(self.opcode, res[1])
+ eq_(self.use_fdb_only, res[2] >> 7 & 0x01)
+ eq_(self.fwd_yes, res[2] >> 6 & 0x01)
+ eq_(self.terminal_mep, res[2] >> 5 & 0x01)
+ eq_(self.first_tlv_offset, res[3])
+ eq_(self.transaction_id, res[4])
+ eq_(self.ttl, res[5])
+ eq_(self.relay_action, res[6])
+ eq_(self.end_tlv, res[7])
+
+ def test_len(self):
+ # 11 octet (If tlv does not exist)
+ eq_(11, len(self.ins))
+
+ def test_default_args(self):
+ ins = cfm.link_trace_reply()
+ buf = ins.serialize()
+ res = struct.unpack_from(cfm.link_trace_reply._PACK_STR, str(buf))
+ eq_(res[0] >> 5, 0)
+ eq_(res[0] & 0x1f, 0)
+ eq_(res[1], 4)
+ eq_(res[2] >> 7, 1)
+ eq_(res[2] >> 6 & 0x01, 0)
+ eq_(res[2] >> 5 & 0x01, 1)
+ eq_(res[3], 6)
+ eq_(res[4], 0)
+ eq_(res[5], 64)
+ eq_(res[6], 1)
+
+
+class Test_sender_id_tlv(unittest.TestCase):
+
+ def setUp(self):
+ self._type = cfm.CFM_SENDER_ID_TLV
+ self.length = 10
+ self.chassis_id_length = 1
+ self.chassis_id_subtype = 3
+ self.chassis_id = "\x0a"
+ self.ma_domain_length = 2
+ self.ma_domain = "\x04\x05"
+ self.ma_length = 3
+ self.ma = "\x01\x02\x03"
+ self.ins = cfm.sender_id_tlv(
+ self.length,
+ self.chassis_id_length,
+ self.chassis_id_subtype,
+ self.chassis_id,
+ self.ma_domain_length,
+ self.ma_domain,
+ self.ma_length,
+ self.ma,
+ )
+ self.form = '!BHBB1sB2sB3s'
+ self.buf = struct.pack(
+ self.form,
+ self._type,
+ self.length,
+ self.chassis_id_length,
+ self.chassis_id_subtype,
+ self.chassis_id,
+ self.ma_domain_length,
+ self.ma_domain,
+ self.ma_length,
+ self.ma
+ )
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.length, self.ins.length)
+ eq_(self.chassis_id_length, self.ins.chassis_id_length)
+ eq_(self.chassis_id_subtype, self.ins.chassis_id_subtype)
+ eq_(self.chassis_id, self.ins.chassis_id)
+ eq_(self.ma_domain_length, self.ins.ma_domain_length)
+ eq_(self.ma_domain, self.ins.ma_domain)
+ eq_(self.ma_length, self.ins.ma_length)
+ eq_(self.ma, self.ins.ma)
+
+ def test_parser(self):
+ _res = cfm.sender_id_tlv.parser(self.buf)
+ if type(_res) is tuple:
+ res = _res[0]
+ else:
+ res = _res
+ eq_(self.length, res.length)
+ eq_(self.chassis_id_length, res.chassis_id_length)
+ eq_(self.chassis_id_subtype, res.chassis_id_subtype)
+ eq_(self.chassis_id, res.chassis_id)
+ eq_(self.ma_domain_length, res.ma_domain_length)
+ eq_(self.ma_domain, res.ma_domain)
+ eq_(self.ma_length, res.ma_length)
+ eq_(self.ma, res.ma)
+
+ def test_serialize(self):
+ buf = self.ins.serialize()
+ res = struct.unpack_from(self.form, str(buf))
+ eq_(self._type, res[0])
+ eq_(self.length, res[1])
+ eq_(self.chassis_id_length, res[2])
+ eq_(self.chassis_id_subtype, res[3])
+ eq_(self.chassis_id, res[4])
+ eq_(self.ma_domain_length, res[5])
+ eq_(self.ma_domain, res[6])
+ eq_(self.ma_length, res[7])
+ eq_(self.ma, res[8])
+
+ def test_serialize_semi_normal_ptn1(self):
+ ins = cfm.sender_id_tlv(
+ chassis_id_subtype=self.chassis_id_subtype,
+ chassis_id=self.chassis_id,
+ ma_domain=self.ma_domain,
+ )
+ buf = ins.serialize()
+ form = '!BHBB1sB2sB'
+ res = struct.unpack_from(form, str(buf))
+ eq_(self._type, res[0])
+ eq_(7, res[1])
+ eq_(self.chassis_id_length, res[2])
+ eq_(self.chassis_id_subtype, res[3])
+ eq_(self.chassis_id, res[4])
+ eq_(self.ma_domain_length, res[5])
+ eq_(self.ma_domain, res[6])
+ eq_(0, res[7])
+
+ def test_serialize_semi_normal_ptn2(self):
+ ins = cfm.sender_id_tlv(
+ ma_domain=self.ma_domain,
+ ma=self.ma,
+ )
+ buf = ins.serialize()
+ form = '!BHBB2sB3s'
+ res = struct.unpack_from(form, str(buf))
+ eq_(self._type, res[0])
+ eq_(8, res[1])
+ eq_(0, res[2])
+ eq_(self.ma_domain_length, res[3])
+ eq_(self.ma_domain, res[4])
+ eq_(self.ma_length, res[5])
+ eq_(self.ma, res[6])
+
+ def test_serialize_semi_normal_ptn3(self):
+ ins = cfm.sender_id_tlv(
+ chassis_id_subtype=self.chassis_id_subtype,
+ chassis_id=self.chassis_id,
+ )
+ buf = ins.serialize()
+ form = '!BHBB1sB'
+ res = struct.unpack_from(form, str(buf))
+ eq_(self._type, res[0])
+ eq_(4, res[1])
+ eq_(self.chassis_id_length, res[2])
+ eq_(self.chassis_id_subtype, res[3])
+ eq_(self.chassis_id, res[4])
+ eq_(0, res[5])
+
+ def test_serialize_semi_normal_ptn4(self):
+ ins = cfm.sender_id_tlv(
+ ma_domain=self.ma_domain,
+ )
+ buf = ins.serialize()
+ form = '!BHBB2sB'
+ res = struct.unpack_from(form, str(buf))
+ eq_(self._type, res[0])
+ eq_(5, res[1])
+ eq_(0, res[2])
+ eq_(self.ma_domain_length, res[3])
+ eq_(self.ma_domain, res[4])
+ eq_(0, res[5])
+
+ def test_serialize_with_length_zero(self):
+ ins = cfm.sender_id_tlv(
+ 0,
+ 0,
+ self.chassis_id_subtype,
+ self.chassis_id,
+ 0,
+ self.ma_domain,
+ 0,
+ self.ma,
+ )
+ buf = ins.serialize()
+ res = struct.unpack_from(self.form, str(buf))
+ eq_(self._type, res[0])
+ eq_(self.length, res[1])
+ eq_(self.chassis_id_length, res[2])
+ eq_(self.chassis_id_subtype, res[3])
+ eq_(self.chassis_id, res[4])
+ eq_(self.ma_domain_length, res[5])
+ eq_(self.ma_domain, res[6])
+ eq_(self.ma_length, res[7])
+ eq_(self.ma, res[8])
+
+ def test_len(self):
+ # tlv_length = type_len + length_len + value_len
+ eq_(1 + 2 + 10, len(self.ins))
+
+ def test_default_args(self):
+ ins = cfm.sender_id_tlv()
+ buf = ins.serialize()
+ res = struct.unpack_from(cfm.sender_id_tlv._PACK_STR, str(buf))
+ eq_(res[0], cfm.CFM_SENDER_ID_TLV)
+ eq_(res[1], 1)
+ eq_(res[2], 0)
+
+
+class Test_port_status_tlv(unittest.TestCase):
+
+ def setUp(self):
+ self._type = cfm.CFM_PORT_STATUS_TLV
+ self.length = 1
+ self.port_status = 1
+ self.ins = cfm.port_status_tlv(
+ self.length,
+ self.port_status
+ )
+ self.form = '!BHB'
+ self.buf = struct.pack(
+ self.form,
+ self._type,
+ self.length,
+ self.port_status
+ )
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.length, self.ins.length)
+ eq_(self.port_status, self.ins.port_status)
+
+ def test_parser(self):
+ _res = cfm.port_status_tlv.parser(self.buf)
+ if type(_res) is tuple:
+ res = _res[0]
+ else:
+ res = _res
+ eq_(self.length, res.length)
+ eq_(self.port_status, res.port_status)
+
+ def test_serialize(self):
+ buf = self.ins.serialize()
+ res = struct.unpack_from(self.form, str(buf))
+ eq_(self._type, res[0])
+ eq_(self.length, res[1])
+ eq_(self.port_status, res[2])
+
+ def test_len(self):
+ # tlv_length = type_len + length_len + value_len
+ eq_(1 + 2 + 1, len(self.ins))
+
+ def test_default_args(self):
+ ins = cfm.port_status_tlv()
+ buf = ins.serialize()
+ res = struct.unpack_from(cfm.port_status_tlv._PACK_STR, str(buf))
+ eq_(res[0], cfm.CFM_PORT_STATUS_TLV)
+ eq_(res[1], 1)
+ eq_(res[2], 2)
+
+
+class Test_data_tlv(unittest.TestCase):
+
+ def setUp(self):
+ self._type = cfm.CFM_DATA_TLV
+ self.length = 3
+ self.data_value = "\x01\x02\x03"
+ self.ins = cfm.data_tlv(
+ self.length,
+ self.data_value
+ )
+ self.form = '!BH3s'
+ self.buf = struct.pack(
+ self.form,
+ self._type,
+ self.length,
+ self.data_value,
+ )
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.length, self.ins.length)
+ eq_(self.data_value, self.ins.data_value)
+
+ def test_parser(self):
+ _res = cfm.data_tlv.parser(self.buf)
+ if type(_res) is tuple:
+ res = _res[0]
+ else:
+ res = _res
+ eq_(self.length, res.length)
+ eq_(self.data_value, res.data_value)
+
+ def test_serialize(self):
+ buf = self.ins.serialize()
+ res = struct.unpack_from(self.form, str(buf))
+ eq_(self._type, res[0])
+ eq_(self.length, res[1])
+ eq_(self.data_value, res[2])
+
+ def test_serialize_with_length_zero(self):
+ ins = cfm.data_tlv(
+ 0,
+ self.data_value
+ )
+ buf = ins.serialize()
+ res = struct.unpack_from(self.form, str(buf))
+ eq_(self._type, res[0])
+ eq_(self.length, res[1])
+ eq_(self.data_value, res[2])
+
+ def test_len(self):
+ # tlv_length = type_len + length_len + value_len
+ eq_(1 + 2 + 3, len(self.ins))
+
+ def test_default_args(self):
+ ins = cfm.data_tlv()
+ buf = ins.serialize()
+ res = struct.unpack_from(cfm.data_tlv._PACK_STR, str(buf))
+ eq_(res[0], cfm.CFM_DATA_TLV)
+ eq_(res[1], 0)
+
+
+class Test_interface_status_tlv(unittest.TestCase):
+
+ def setUp(self):
+ self._type = cfm.CFM_INTERFACE_STATUS_TLV
+ self.length = 1
+ self.interface_status = 4
+ self.ins = cfm.interface_status_tlv(
+ self.length,
+ self.interface_status
+ )
+ self.form = '!BHB'
+ self.buf = struct.pack(
+ self.form,
+ self._type,
+ self.length,
+ self.interface_status
+ )
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.length, self.ins.length)
+ eq_(self.interface_status, self.ins.interface_status)
+
+ def test_parser(self):
+ _res = cfm.interface_status_tlv.parser(self.buf)
+ if type(_res) is tuple:
+ res = _res[0]
+ else:
+ res = _res
+ eq_(self.length, res.length)
+ eq_(self.interface_status, res.interface_status)
+
+ def test_serialize(self):
+ buf = self.ins.serialize()
+ res = struct.unpack_from(self.form, str(buf))
+ eq_(self._type, res[0])
+ eq_(self.length, res[1])
+ eq_(self.interface_status, res[2])
+
+ def test_len(self):
+ # tlv_length = type_len + length_len + value_len
+ eq_(1 + 2 + 1, len(self.ins))
+
+ def test_default_args(self):
+ ins = cfm.interface_status_tlv()
+ buf = ins.serialize()
+ res = struct.unpack_from(cfm.interface_status_tlv._PACK_STR, str(buf))
+ eq_(res[0], cfm.CFM_INTERFACE_STATUS_TLV)
+ eq_(res[1], 1)
+ eq_(res[2], 1)
+
+
+class Test_ltm_egress_identifier_tlv(unittest.TestCase):
+
+ def setUp(self):
+ self._type = cfm.CFM_LTM_EGRESS_IDENTIFIER_TLV
+ self.length = 8
+ self.egress_id_ui = 7
+ self.egress_id_mac = "11:22:33:44:55:66"
+ self.ins = cfm.ltm_egress_identifier_tlv(
+ self.length,
+ self.egress_id_ui,
+ self.egress_id_mac
+ )
+ self.form = '!BHH6s'
+ self.buf = struct.pack(
+ self.form,
+ self._type,
+ self.length,
+ self.egress_id_ui,
+ addrconv.mac.text_to_bin(self.egress_id_mac)
+ )
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.length, self.ins.length)
+ eq_(self.egress_id_ui, self.ins.egress_id_ui)
+ eq_(self.egress_id_mac, self.ins.egress_id_mac)
+
+ def test_parser(self):
+ _res = cfm.ltm_egress_identifier_tlv.parser(self.buf)
+ if type(_res) is tuple:
+ res = _res[0]
+ else:
+ res = _res
+ eq_(self.length, res.length)
+ eq_(self.egress_id_ui, res.egress_id_ui)
+ eq_(self.egress_id_mac, res.egress_id_mac)
+
+ def test_serialize(self):
+ buf = self.ins.serialize()
+ res = struct.unpack_from(self.form, str(buf))
+ eq_(self._type, res[0])
+ eq_(self.length, res[1])
+ eq_(self.egress_id_ui, res[2])
+ eq_(addrconv.mac.text_to_bin(self.egress_id_mac), res[3])
+
+ def test_serialize_with_length_zero(self):
+ ins = cfm.ltm_egress_identifier_tlv(
+ 0,
+ self.egress_id_ui,
+ self.egress_id_mac
+ )
+ buf = ins.serialize()
+ res = struct.unpack_from(self.form, str(buf))
+ eq_(self._type, res[0])
+ eq_(self.length, res[1])
+ eq_(self.egress_id_ui, res[2])
+ eq_(addrconv.mac.text_to_bin(self.egress_id_mac), res[3])
+
+ def test_len(self):
+ # tlv_length = type_len + length_len + value_len
+ eq_(1 + 2 + 8, len(self.ins))
+
+ def test_default_args(self):
+ ins = cfm.ltm_egress_identifier_tlv()
+ buf = ins.serialize()
+ res = struct.unpack_from(
+ cfm.ltm_egress_identifier_tlv._PACK_STR, str(buf))
+ eq_(res[0], cfm.CFM_LTM_EGRESS_IDENTIFIER_TLV)
+ eq_(res[1], 8)
+ eq_(res[2], 0)
+ eq_(res[3], addrconv.mac.text_to_bin('00:00:00:00:00:00'))
+
+
+class Test_ltr_egress_identifier_tlv(unittest.TestCase):
+
+ def setUp(self):
+ self._type = cfm.CFM_LTR_EGRESS_IDENTIFIER_TLV
+ self.length = 16
+ self.last_egress_id_ui = 7
+ self.last_egress_id_mac = "11:22:33:44:55:66"
+ self.next_egress_id_ui = 5
+ self.next_egress_id_mac = "33:11:33:aa:bb:cc"
+ self.ins = cfm.ltr_egress_identifier_tlv(self.length,
+ self.last_egress_id_ui,
+ self.last_egress_id_mac,
+ self.next_egress_id_ui,
+ self.next_egress_id_mac
+ )
+ self.form = '!BHH6sH6s'
+ self.buf = struct.pack(
+ self.form,
+ self._type,
+ self.length,
+ self.last_egress_id_ui,
+ addrconv.mac.text_to_bin(self.last_egress_id_mac),
+ self.next_egress_id_ui,
+ addrconv.mac.text_to_bin(self.next_egress_id_mac))
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.length, self.ins.length)
+ eq_(self.last_egress_id_ui, self.ins.last_egress_id_ui)
+ eq_(self.last_egress_id_mac, self.ins.last_egress_id_mac)
+ eq_(self.next_egress_id_ui, self.ins.next_egress_id_ui)
+ eq_(self.next_egress_id_mac, self.ins.next_egress_id_mac)
+
+ def test_parser(self):
+ _res = cfm.ltr_egress_identifier_tlv.parser(self.buf)
+ if type(_res) is tuple:
+ res = _res[0]
+ else:
+ res = _res
+ eq_(self.length, res.length)
+ eq_(self.last_egress_id_ui, res.last_egress_id_ui)
+ eq_(self.last_egress_id_mac, res.last_egress_id_mac)
+ eq_(self.next_egress_id_ui, res.next_egress_id_ui)
+ eq_(self.next_egress_id_mac, res.next_egress_id_mac)
+
+ def test_serialize(self):
+ buf = self.ins.serialize()
+ res = struct.unpack_from(self.form, str(buf))
+ eq_(self._type, res[0])
+ eq_(self.length, res[1])
+ eq_(self.last_egress_id_ui, res[2])
+ eq_(addrconv.mac.text_to_bin(self.last_egress_id_mac), res[3])
+ eq_(self.next_egress_id_ui, res[4])
+ eq_(addrconv.mac.text_to_bin(self.next_egress_id_mac), res[5])
+
+ def test_serialize_with_length_zero(self):
+ ins = cfm.ltr_egress_identifier_tlv(0,
+ self.last_egress_id_ui,
+ self.last_egress_id_mac,
+ self.next_egress_id_ui,
+ self.next_egress_id_mac
+ )
+ buf = ins.serialize()
+ res = struct.unpack_from(self.form, str(buf))
+ eq_(self._type, res[0])
+ eq_(self.length, res[1])
+ eq_(self.last_egress_id_ui, res[2])
+ eq_(addrconv.mac.text_to_bin(self.last_egress_id_mac), res[3])
+ eq_(self.next_egress_id_ui, res[4])
+ eq_(addrconv.mac.text_to_bin(self.next_egress_id_mac), res[5])
+
+ def test_len(self):
+ # tlv_length = type_len + length_len + value_len
+ eq_(1 + 2 + 16, len(self.ins))
+
+ def test_default_args(self):
+ ins = cfm.ltr_egress_identifier_tlv()
+ buf = ins.serialize()
+ res = struct.unpack_from(cfm.ltr_egress_identifier_tlv._PACK_STR,
+ str(buf))
+ eq_(res[0], cfm.CFM_LTR_EGRESS_IDENTIFIER_TLV)
+ eq_(res[1], 16)
+ eq_(res[2], 0)
+ eq_(res[3], addrconv.mac.text_to_bin('00:00:00:00:00:00'))
+ eq_(res[4], 0)
+ eq_(res[5], addrconv.mac.text_to_bin('00:00:00:00:00:00'))
+
+
+class Test_organization_specific_tlv(unittest.TestCase):
+
+ def setUp(self):
+ self._type = cfm.CFM_ORGANIZATION_SPECIFIC_TLV
+ self.length = 10
+ self.oui = "\xff\x12\x34"
+ self.subtype = 3
+ self.value = "\x01\x02\x0f\x0e\x0d\x0c"
+ self.ins = cfm.organization_specific_tlv(self.length,
+ self.oui,
+ self.subtype,
+ self.value
+ )
+ self.form = '!BH3sB6s'
+ self.buf = struct.pack(self.form,
+ self._type,
+ self.length,
+ self.oui,
+ self.subtype,
+ self.value
+ )
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.length, self.ins.length)
+ eq_(self.oui, self.ins.oui)
+ eq_(self.subtype, self.ins.subtype)
+ eq_(self.value, self.ins.value)
+
+ def test_parser(self):
+ _res = cfm.organization_specific_tlv.parser(self.buf)
+ if type(_res) is tuple:
+ res = _res[0]
+ else:
+ res = _res
+ eq_(self.length, res.length)
+ eq_(self.oui, res.oui)
+ eq_(self.subtype, res.subtype)
+ eq_(self.value, res.value)
+
+ def test_serialize(self):
+ buf = self.ins.serialize()
+ res = struct.unpack_from(self.form, str(buf))
+ eq_(self._type, res[0])
+ eq_(self.length, res[1])
+ eq_(self.oui, res[2])
+ eq_(self.subtype, res[3])
+ eq_(self.value, res[4])
+
+ def test_serialize_with_zero(self):
+ ins = cfm.organization_specific_tlv(0,
+ self.oui,
+ self.subtype,
+ self.value
+ )
+ buf = ins.serialize()
+ res = struct.unpack_from(self.form, str(buf))
+ eq_(self._type, res[0])
+ eq_(self.length, res[1])
+ eq_(self.oui, res[2])
+ eq_(self.subtype, res[3])
+ eq_(self.value, res[4])
+
+ def test_len(self):
+ # tlv_length = type_len + length_len + value_len
+ eq_(1 + 2 + 10, len(self.ins))
+
+ def test_default_args(self):
+ ins = cfm.organization_specific_tlv()
+ buf = ins.serialize()
+ res = struct.unpack_from(cfm.organization_specific_tlv._PACK_STR,
+ str(buf))
+ eq_(res[0], cfm.CFM_ORGANIZATION_SPECIFIC_TLV)
+ eq_(res[1], 4)
+ eq_(res[2], "\x00\x00\x00")
+ eq_(res[3], 0)
+
+
+class Test_reply_ingress_tlv(unittest.TestCase):
+
+ def setUp(self):
+ self._type = cfm.CFM_REPLY_INGRESS_TLV
+ self.length = 12
+ self.action = 2
+ self.mac_address = 'aa:bb:cc:56:34:12'
+ self.port_id_length = 3
+ self.port_id_subtype = 2
+ self.port_id = "\x01\x04\x09"
+ self.ins = cfm.reply_ingress_tlv(self.length, self.action,
+ self.mac_address,
+ self.port_id_length,
+ self.port_id_subtype,
+ self.port_id
+ )
+ self.form = '!BHB6sBB3s'
+ self.buf = struct.pack(self.form,
+ self._type,
+ self.length,
+ self.action,
+ addrconv.mac.text_to_bin(self.mac_address),
+ self.port_id_length,
+ self.port_id_subtype,
+ self.port_id
+ )
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.length, self.ins.length)
+ eq_(self.action, self.ins.action)
+ eq_(self.mac_address, self.ins.mac_address)
+ eq_(self.port_id_length, self.ins.port_id_length)
+ eq_(self.port_id_subtype, self.ins.port_id_subtype)
+ eq_(self.port_id, self.ins.port_id)
+
+ def test_parser(self):
+ _res = cfm.reply_ingress_tlv.parser(self.buf)
+ if type(_res) is tuple:
+ res = _res[0]
+ else:
+ res = _res
+ eq_(self.length, res.length)
+ eq_(self.action, res.action)
+ eq_(self.mac_address, res.mac_address)
+ eq_(self.port_id_length, res.port_id_length)
+ eq_(self.port_id_subtype, res.port_id_subtype)
+ eq_(self.port_id, res.port_id)
+
+ def test_serialize(self):
+ buf = self.ins.serialize()
+ res = struct.unpack_from(self.form, str(buf))
+ eq_(self._type, res[0])
+ eq_(self.length, res[1])
+ eq_(self.action, res[2])
+ eq_(addrconv.mac.text_to_bin(self.mac_address), res[3])
+ eq_(self.port_id_length, res[4])
+ eq_(self.port_id_subtype, res[5])
+ eq_(self.port_id, res[6])
+
+ def test_serialize_with_zero(self):
+ ins = cfm.reply_ingress_tlv(0,
+ self.action,
+ self.mac_address,
+ 0,
+ self.port_id_subtype,
+ self.port_id
+ )
+ buf = ins.serialize()
+ res = struct.unpack_from(self.form, str(buf))
+ eq_(self._type, res[0])
+ eq_(self.length, res[1])
+ eq_(self.action, res[2])
+ eq_(addrconv.mac.text_to_bin(self.mac_address), res[3])
+ eq_(self.port_id_length, res[4])
+ eq_(self.port_id_subtype, res[5])
+ eq_(self.port_id, res[6])
+
+ def test_len(self):
+ # tlv_length = type_len + length_len + value_len
+ eq_(1 + 2 + 12, len(self.ins))
+
+ def test_default_args(self):
+ ins = cfm.reply_ingress_tlv()
+ buf = ins.serialize()
+ res = struct.unpack_from(cfm.reply_ingress_tlv._PACK_STR, str(buf))
+ eq_(res[0], cfm.CFM_REPLY_INGRESS_TLV)
+ eq_(res[1], 7)
+ eq_(res[2], 1)
+ eq_(res[3], addrconv.mac.text_to_bin('00:00:00:00:00:00'))
+
+
+class Test_reply_egress_tlv(unittest.TestCase):
+
+ def setUp(self):
+ self._type = cfm.CFM_REPLY_EGRESS_TLV
+ self.length = 12
+ self.action = 2
+ self.mac_address = 'aa:bb:cc:56:34:12'
+ self.port_id_length = 3
+ self.port_id_subtype = 2
+ self.port_id = "\x01\x04\x09"
+ self.ins = cfm.reply_egress_tlv(self.length,
+ self.action,
+ self.mac_address,
+ self.port_id_length,
+ self.port_id_subtype,
+ self.port_id
+ )
+ self.form = '!BHB6sBB3s'
+ self.buf = struct.pack(self.form,
+ self._type,
+ self.length,
+ self.action,
+ addrconv.mac.text_to_bin(self.mac_address),
+ self.port_id_length,
+ self.port_id_subtype,
+ self.port_id
+ )
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.length, self.ins.length)
+ eq_(self.action, self.ins.action)
+ eq_(self.mac_address, self.ins.mac_address)
+ eq_(self.port_id_length, self.ins.port_id_length)
+ eq_(self.port_id_subtype, self.ins.port_id_subtype)
+ eq_(self.port_id, self.ins.port_id)
+
+ def test_parser(self):
+ _res = cfm.reply_ingress_tlv.parser(self.buf)
+ if type(_res) is tuple:
+ res = _res[0]
+ else:
+ res = _res
+ eq_(self.length, res.length)
+ eq_(self.action, res.action)
+ eq_(self.mac_address, res.mac_address)
+ eq_(self.port_id_length, res.port_id_length)
+ eq_(self.port_id_subtype, res.port_id_subtype)
+ eq_(self.port_id, res.port_id)
+
+ def test_serialize(self):
+ buf = self.ins.serialize()
+ res = struct.unpack_from(self.form, str(buf))
+ eq_(self._type, res[0])
+ eq_(self.length, res[1])
+ eq_(self.action, res[2])
+ eq_(addrconv.mac.text_to_bin(self.mac_address), res[3])
+ eq_(self.port_id_length, res[4])
+ eq_(self.port_id_subtype, res[5])
+ eq_(self.port_id, res[6])
+
+ def test_serialize_with_zero(self):
+ ins = cfm.reply_egress_tlv(0,
+ self.action,
+ self.mac_address,
+ 0,
+ self.port_id_subtype,
+ self.port_id
+ )
+ buf = ins.serialize()
+ res = struct.unpack_from(self.form, str(buf))
+ eq_(self._type, res[0])
+ eq_(self.length, res[1])
+ eq_(self.action, res[2])
+ eq_(addrconv.mac.text_to_bin(self.mac_address), res[3])
+ eq_(self.port_id_length, res[4])
+ eq_(self.port_id_subtype, res[5])
+ eq_(self.port_id, res[6])
+
+ def test_len(self):
+ # tlv_length = type_len + length_len + value_len
+ eq_(1 + 2 + 12, len(self.ins))
+
+ def test_default_args(self):
+ ins = cfm.reply_egress_tlv()
+ buf = ins.serialize()
+ res = struct.unpack_from(cfm.reply_egress_tlv._PACK_STR, str(buf))
+ eq_(res[0], cfm.CFM_REPLY_EGRESS_TLV)
+ eq_(res[1], 7)
+ eq_(res[2], 1)
+ eq_(res[3], addrconv.mac.text_to_bin('00:00:00:00:00:00'))