diff options
author | takahashi.minoru <takahashi.minoru7@gmail.com> | 2014-06-06 14:32:17 +0900 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2014-06-10 21:11:52 +0900 |
commit | cfdce1f34043a0e245999e0515be3bbc3f5f0316 (patch) | |
tree | 5dc551a5ecd2817be6b9252af86c8a57d6ce8e5c | |
parent | ed25eefa846e2f8b7a6683908f224efc2bc0acc9 (diff) |
packet lib: add Connectivity Fault Management Protocol(CFM, IEEE 802.1ag)
Signed-off-by: TAKAHASHI Minoru <takahashi.minoru7@gmail.com>
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r-- | doc/source/library_packet_ref.rst | 3 | ||||
-rw-r--r-- | ryu/lib/packet/cfm.py | 1371 | ||||
-rw-r--r-- | ryu/lib/packet/vlan.py | 2 | ||||
-rw-r--r-- | ryu/ofproto/ether.py | 1 | ||||
-rw-r--r-- | ryu/tests/unit/packet/test_cfm.py | 1760 |
5 files changed, 3137 insertions, 0 deletions
diff --git a/doc/source/library_packet_ref.rst b/doc/source/library_packet_ref.rst index a39aa563..7048e72c 100644 --- a/doc/source/library_packet_ref.rst +++ b/doc/source/library_packet_ref.rst @@ -50,6 +50,9 @@ Protocol Header classes .. automodule:: ryu.lib.packet.icmpv6 :members: +.. automodule:: ryu.lib.packet.cfm + :members: + .. automodule:: ryu.lib.packet.tcp :members: diff --git a/ryu/lib/packet/cfm.py b/ryu/lib/packet/cfm.py new file mode 100644 index 00000000..8e5717e5 --- /dev/null +++ b/ryu/lib/packet/cfm.py @@ -0,0 +1,1371 @@ +# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +import six +import struct +from ryu.lib import addrconv +from ryu.lib import stringify +from ryu.lib.packet import packet_base + +# IEEE 802.1ag OpCode +CFM_CC_MESSAGE = 0x01 +CFM_LOOPBACK_REPLY = 0x02 +CFM_LOOPBACK_MESSAGE = 0x03 +CFM_LINK_TRACE_REPLY = 0x04 +CFM_LINK_TRACE_MESSAGE = 0x05 + +# IEEE 802.1ag TLV type +CFM_END_TLV = 0x00 +CFM_SENDER_ID_TLV = 0x01 +CFM_PORT_STATUS_TLV = 0x02 +CFM_DATA_TLV = 0x03 +CFM_INTERFACE_STATUS_TLV = 0x04 +CFM_REPLY_INGRESS_TLV = 0x05 +CFM_REPLY_EGRESS_TLV = 0x06 +CFM_LTM_EGRESS_IDENTIFIER_TLV = 0x07 +CFM_LTR_EGRESS_IDENTIFIER_TLV = 0x08 +CFM_ORGANIZATION_SPECIFIC_TLV = 0x1f + +# IEEE 802.1ag CFM version +CFM_VERSION = 0 + + +class cfm(packet_base.PacketBase): + """CFM (Connectivity Fault Management) Protocol header class. + + http://standards.ieee.org/getieee802/download/802.1ag-2007.pdf + + OpCode Field range assignments + + +---------------+--------------------------------------------------+ + | OpCode range | CFM PDU or organization | + +===============+==================================================+ + | 0 | Reserved for IEEE 802.1 | + +---------------+--------------------------------------------------+ + | 1 | Continuity Check Message (CCM) | + +---------------+--------------------------------------------------+ + | 2 | Loopback Reply (LBR) | + +---------------+--------------------------------------------------+ + | 3 | Loopback Message (LBM) | + +---------------+--------------------------------------------------+ + | 4 | Linktrace Reply (LTR) | + +---------------+--------------------------------------------------+ + | 5 | Linktrace Message (LTM) | + +---------------+--------------------------------------------------+ + | 06 - 31 | Reserved for IEEE 802.1 | + +---------------+--------------------------------------------------+ + | 32 - 63 | Defined by ITU-T Y.1731 | + +---------------+--------------------------------------------------+ + | 64 - 255 | Reserved for IEEE 802.1. | + +---------------+--------------------------------------------------+ + + An instance has the following attributes at least. + Most of them are same to the on-wire counterparts but in host byte order. + __init__ takes the corresponding args in this order. + + .. tabularcolumns:: |l|L| + + ============== ======================================== + Attribute Description + ============== ======================================== + op CFM PDU + ============== ======================================== + + """ + _PACK_STR = '!B' + _MIN_LEN = struct.calcsize(_PACK_STR) + _CFM_OPCODE = {} + _TYPE = { + 'ascii': [ + 'ltm_orig_addr', 'ltm_targ_addr' + ] + } + + @staticmethod + def register_cfm_opcode(type_): + def _register_cfm_opcode(cls): + cfm._CFM_OPCODE[type_] = cls + return cls + return _register_cfm_opcode + + def __init__(self, op=None): + super(cfm, self).__init__() + assert isinstance(op, operation) + self.op = op + + @classmethod + def parser(cls, buf): + (opcode, ) = struct.unpack_from(cls._PACK_STR, buf, cls._MIN_LEN) + cls_ = cls._CFM_OPCODE.get(opcode) + op = cls_.parser(buf) + instance = cls(op) + rest = buf[len(instance):] + return instance, None, rest + + def serialize(self, payload, prev): + buf = self.op.serialize() + return buf + + def __len__(self): + return len(self.op) + + +@six.add_metaclass(abc.ABCMeta) +class operation(stringify.StringifyMixin): + + _TLV_TYPES = {} + _END_TLV_LEN = 1 + + @staticmethod + def register_tlv_types(type_): + def _register_tlv_types(cls): + operation._TLV_TYPES[type_] = cls + return cls + return _register_tlv_types + + def __init__(self, md_lv, version, tlvs): + self.md_lv = md_lv + self.version = version + tlvs = tlvs or [] + assert isinstance(tlvs, list) + for tlv_ in tlvs: + assert isinstance(tlv_, tlv) + self.tlvs = tlvs + + @classmethod + @abc.abstractmethod + def parser(cls, buf): + pass + + @abc.abstractmethod + def serialize(self): + pass + + @abc.abstractmethod + def __len__(self): + pass + + @classmethod + def _parser_tlvs(cls, buf): + offset = 0 + tlvs = [] + while True: + (type_, ) = struct.unpack_from('!B', buf, offset) + cls_ = cls._TLV_TYPES.get(type_) + if not cls_: + assert type_ is CFM_END_TLV + break + tlv_ = cls_.parser(buf[offset:]) + tlvs.append(tlv_) + offset += len(tlv_) + return tlvs + + @staticmethod + def _serialize_tlvs(tlvs): + buf = bytearray() + for tlv_ in tlvs: + buf.extend(tlv_.serialize()) + return buf + + def _calc_len(self, len_): + for tlv_ in self.tlvs: + len_ += len(tlv_) + len_ += self._END_TLV_LEN + return len_ + + +@cfm.register_cfm_opcode(CFM_CC_MESSAGE) +class cc_message(operation): + + """CFM (IEEE Std 802.1ag-2007) Continuity Check Message (CCM) + encoder/decoder class. + + This is used with ryu.lib.packet.cfm.cfm. + + An instance has the following attributes at least. + Most of them are same to the on-wire counterparts but in host byte order. + __init__ takes the corresponding args in this order. + + .. tabularcolumns:: |l|L| + + ==================== ======================================= + Attribute Description + ==================== ======================================= + md_lv Maintenance Domain Level. + version The protocol version number. + rdi RDI bit. + interval CCM Interval.The default is 4 (1 frame/s) + seq_num Sequence Number. + mep_id Maintenance association End Point Identifier. + md_name_format Maintenance Domain Name Format. + The default is 4 (Character string) + md_name_length Maintenance Domain Name Length. + (0 means automatically-calculate + when encoding.) + md_name Maintenance Domain Name. + short_ma_name_format Short MA Name Format. + The default is 2 (Character string) + short_ma_name_length Short MA Name Format Length. + (0 means automatically-calculate + when encoding.) + short_ma_name Short MA Name. + tlvs TLVs. + ==================== ======================================= + """ + + _PACK_STR = '!4BIHB' + + _MIN_LEN = struct.calcsize(_PACK_STR) + _TLV_OFFSET = 70 + _MD_NAME_FORMAT_LEN = 1 + _MD_NAME_LENGTH_LEN = 1 + _SHORT_MA_NAME_FORMAT_LEN = 1 + _SHORT_MA_NAME_LENGTH_LEN = 1 + _MA_ID_LEN = 64 + + # Maintenance Domain Name Format + _MD_FMT_NO_MD_NAME_PRESENT = 1 + _MD_FMT_DOMAIN_NAME_BASED_STRING = 2 + _MD_FMT_MAC_ADDRESS_TWO_OCTET_INTEGER = 3 + _MD_FMT_CHARACTER_STRING = 4 + + # Short MA Name Format + _SHORT_MA_FMT_PRIMARY_VID = 1 + _SHORT_MA_FMT_CHARACTER_STRING = 2 + _SHORT_MA_FMT_TWO_OCTET_INTEGER = 3 + _SHORT_MA_FMT_RFC_2685_VPN_ID = 4 + + # CCM Transmission Interval + _INTERVAL_300_HZ = 1 + _INTERVAL_10_MSEC = 2 + _INTERVAL_100_MSEC = 3 + _INTERVAL_1_SEC = 4 + _INTERVAL_10_SEC = 5 + _INTERVAL_1_MIN = 6 + _INTERVAL_10_MIN = 6 + + def __init__(self, md_lv=0, version=CFM_VERSION, + rdi=0, interval=_INTERVAL_1_SEC, seq_num=0, mep_id=1, + md_name_format=_MD_FMT_CHARACTER_STRING, + md_name_length=0, md_name="0", + short_ma_name_format=_SHORT_MA_FMT_CHARACTER_STRING, + short_ma_name_length=0, short_ma_name="1", + tlvs=None): + super(cc_message, self).__init__(md_lv, version, tlvs) + self._opcode = CFM_CC_MESSAGE + assert rdi in [0, 1] + self.rdi = rdi + assert interval is not 0 + self.interval = interval + self.seq_num = seq_num + assert 1 <= mep_id <= 8191 + self.mep_id = mep_id + self.md_name_format = md_name_format + self.md_name_length = md_name_length + self.md_name = md_name + self.short_ma_name_format = short_ma_name_format + self.short_ma_name_length = short_ma_name_length + self.short_ma_name = short_ma_name + + @classmethod + def parser(cls, buf): + (md_lv_version, opcode, flags, tlv_offset, seq_num, mep_id, + md_name_format) = struct.unpack_from(cls._PACK_STR, buf) + md_name_length = 0 + md_name = "" + md_lv = int(md_lv_version >> 5) + version = int(md_lv_version & 0x1f) + rdi = int(flags >> 7) + interval = int(flags & 0x07) + offset = cls._MIN_LEN + # parse md_name + if md_name_format != cls._MD_FMT_NO_MD_NAME_PRESENT: + (md_name_length, ) = struct.unpack_from("!B", buf, offset) + offset += cls._MD_NAME_LENGTH_LEN + form = "%dB" % md_name_length + md_name = struct.unpack_from(form, buf, offset) + offset += md_name_length + # parse short_ma_name + (short_ma_name_format, + short_ma_name_length) = struct.unpack_from("!2B", buf, offset) + offset += (cls._SHORT_MA_NAME_FORMAT_LEN + + cls._SHORT_MA_NAME_LENGTH_LEN) + form = "%dB" % short_ma_name_length + short_ma_name = struct.unpack_from(form, buf, offset) + offset = cls._MIN_LEN + (cls._MA_ID_LEN - cls._MD_NAME_FORMAT_LEN) + tlvs = cls._parser_tlvs(buf[offset:]) + # ascii to text + if md_name_format == cls._MD_FMT_DOMAIN_NAME_BASED_STRING or \ + md_name_format == cls._MD_FMT_CHARACTER_STRING: + md_name = "".join(map(chr, md_name)) + if short_ma_name_format == cls._SHORT_MA_FMT_CHARACTER_STRING: + short_ma_name = "".join(map(chr, short_ma_name)) + return cls(md_lv, version, rdi, interval, seq_num, mep_id, + md_name_format, md_name_length, + md_name, + short_ma_name_format, short_ma_name_length, + short_ma_name, + tlvs) + + def serialize(self): + buf = struct.pack(self._PACK_STR, + (self.md_lv << 5) | self.version, + self._opcode, + (self.rdi << 7) | self.interval, + self._TLV_OFFSET, + self.seq_num, self.mep_id, self.md_name_format) + buf = bytearray(buf) + # Maintenance Domain Name + if self.md_name_format != self._MD_FMT_NO_MD_NAME_PRESENT: + if self.md_name_length == 0: + self.md_name_length = len(self.md_name) + buf.extend(struct.pack('!B%ds' % self.md_name_length, + self.md_name_length, self.md_name)) + # Short MA Name + if self.short_ma_name_length == 0: + self.short_ma_name_length = len(self.short_ma_name) + buf.extend(struct.pack('!2B%ds' % self.short_ma_name_length, + self.short_ma_name_format, + self.short_ma_name_length, + self.short_ma_name + )) + # 0 pad + maid_length = (self._MD_NAME_FORMAT_LEN + + self._SHORT_MA_NAME_FORMAT_LEN + + self._SHORT_MA_NAME_LENGTH_LEN + + self.short_ma_name_length) + if self.md_name_format != self._MD_FMT_NO_MD_NAME_PRESENT: + maid_length += self._MD_NAME_LENGTH_LEN + self.md_name_length + buf.extend(bytearray(self._MA_ID_LEN - maid_length)) + # tlvs + if self.tlvs: + buf.extend(self._serialize_tlvs(self.tlvs)) + buf.extend(struct.pack("!B", CFM_END_TLV)) + return buf + + def __len__(self): + return self._calc_len( + (self._MIN_LEN - self._MD_NAME_FORMAT_LEN) + self._MA_ID_LEN) + + +class loopback(operation): + + _PACK_STR = '!4BI' + _MIN_LEN = struct.calcsize(_PACK_STR) + _TLV_OFFSET = 4 + + @abc.abstractmethod + def __init__(self, md_lv, version, transaction_id, tlvs): + super(loopback, self).__init__(md_lv, version, tlvs) + self._flags = 0 + self.transaction_id = transaction_id + + @classmethod + def parser(cls, buf): + (md_lv_version, opcode, flags, tlv_offset, + transaction_id) = struct.unpack_from(cls._PACK_STR, buf) + md_lv = int(md_lv_version >> 5) + version = int(md_lv_version & 0x1f) + tlvs = cls._parser_tlvs(buf[cls._MIN_LEN:]) + return cls(md_lv, version, transaction_id, tlvs) + + def serialize(self): + buf = struct.pack(self._PACK_STR, + (self.md_lv << 5) | self.version, + self._opcode, + self._flags, + self._TLV_OFFSET, + self.transaction_id, + ) + buf = bytearray(buf) + # tlvs + if self.tlvs: + buf.extend(self._serialize_tlvs(self.tlvs)) + buf.extend(struct.pack("!B", CFM_END_TLV)) + + return buf + + def __len__(self): + return self._calc_len(self._MIN_LEN) + + +@cfm.register_cfm_opcode(CFM_LOOPBACK_MESSAGE) +class loopback_message(loopback): + + """CFM (IEEE Std 802.1ag-2007) Loopback Message (LBM) encoder/decoder class. + + This is used with ryu.lib.packet.cfm.cfm. + + An instance has the following attributes at least. + Most of them are same to the on-wire counterparts but in host byte order. + __init__ takes the corresponding args in this order. + + .. tabularcolumns:: |l|L| + + ================= ======================================= + Attribute Description + ================= ======================================= + md_lv Maintenance Domain Level. + version The protocol version number. + transaction_id Loopback Transaction Identifier. + tlvs TLVs. + ================= ======================================= + """ + + def __init__(self, md_lv=0, version=CFM_VERSION, + transaction_id=0, + tlvs=None, + ): + super(loopback_message, self).__init__(md_lv, version, + transaction_id, + tlvs) + self._opcode = CFM_LOOPBACK_MESSAGE + + +@cfm.register_cfm_opcode(CFM_LOOPBACK_REPLY) +class loopback_reply(loopback): + + """CFM (IEEE Std 802.1ag-2007) Loopback Reply (LBR) encoder/decoder class. + + This is used with ryu.lib.packet.cfm.cfm. + + An instance has the following attributes at least. + Most of them are same to the on-wire counterparts but in host byte order. + __init__ takes the corresponding args in this order. + + .. tabularcolumns:: |l|L| + + ==================== ======================================= + Attribute Description + ==================== ======================================= + md_lv Maintenance Domain Level. + version The protocol version number. + transaction_id Loopback Transaction Identifier. + tlvs TLVs. + ==================== ======================================= + """ + + def __init__(self, md_lv=0, version=CFM_VERSION, + transaction_id=0, + tlvs=None, + ): + super(loopback_reply, self).__init__(md_lv, version, + transaction_id, + tlvs) + self._opcode = CFM_LOOPBACK_REPLY + + +class link_trace(operation): + + @abc.abstractmethod + def __init__(self, md_lv, version, use_fdb_only, + transaction_id, ttl, tlvs): + super(link_trace, self).__init__(md_lv, version, tlvs) + assert use_fdb_only in [0, 1] + self.use_fdb_only = use_fdb_only + self.transaction_id = transaction_id + self.ttl = ttl + + @classmethod + @abc.abstractmethod + def parser(cls, buf): + pass + + @abc.abstractmethod + def serialize(self): + pass + + def __len__(self): + return self._calc_len(self._MIN_LEN) + + +@cfm.register_cfm_opcode(CFM_LINK_TRACE_MESSAGE) +class link_trace_message(link_trace): + + """CFM (IEEE Std 802.1ag-2007) Linktrace Message (LTM) + encoder/decoder class. + + This is used with ryu.lib.packet.cfm.cfm. + + An instance has the following attributes at least. + Most of them are same to the on-wire counterparts but in host byte order. + __init__ takes the corresponding args in this order. + + .. tabularcolumns:: |l|L| + + ==================== ======================================= + Attribute Description + ==================== ======================================= + md_lv Maintenance Domain Level. + version The protocol version number. + use_fdb_only UseFDBonly bit. + transaction_id LTM Transaction Identifier. + ttl LTM TTL. + ltm_orig_addr Original MAC Address. + ltm_targ_addr Target MAC Address. + tlvs TLVs. + ==================== ======================================= + """ + + _PACK_STR = '!4BIB6s6s' + _ALL_PACK_LEN = struct.calcsize(_PACK_STR) + _MIN_LEN = _ALL_PACK_LEN + _TLV_OFFSET = 17 + + def __init__(self, md_lv=0, version=CFM_VERSION, + use_fdb_only=1, + transaction_id=0, + ttl=64, + ltm_orig_addr='00:00:00:00:00:00', + ltm_targ_addr='00:00:00:00:00:00', + tlvs=None + ): + super(link_trace_message, self).__init__(md_lv, version, + use_fdb_only, + transaction_id, + ttl, + tlvs) + self._opcode = CFM_LINK_TRACE_MESSAGE + self.ltm_orig_addr = ltm_orig_addr + self.ltm_targ_addr = ltm_targ_addr + + @classmethod + def parser(cls, buf): + (md_lv_version, opcode, flags, tlv_offset, transaction_id, ttl, + ltm_orig_addr, ltm_targ_addr) = struct.unpack_from(cls._PACK_STR, buf) + md_lv = int(md_lv_version >> 5) + version = int(md_lv_version & 0x1f) + use_fdb_only = int(flags >> 7) + tlvs = cls._parser_tlvs(buf[cls._MIN_LEN:]) + return cls(md_lv, version, use_fdb_only, + transaction_id, ttl, + addrconv.mac.bin_to_text(ltm_orig_addr), + addrconv.mac.bin_to_text(ltm_targ_addr), + tlvs) + + def serialize(self): + buf = struct.pack(self._PACK_STR, + (self.md_lv << 5) | self.version, + self._opcode, + self.use_fdb_only << 7, + self._TLV_OFFSET, + self.transaction_id, + self.ttl, + addrconv.mac.text_to_bin(self.ltm_orig_addr), + addrconv.mac.text_to_bin(self.ltm_targ_addr), + ) + buf = bytearray(buf) + if self.tlvs: + buf.extend(self._serialize_tlvs(self.tlvs)) + buf.extend(struct.pack("!B", CFM_END_TLV)) + return buf + + +@cfm.register_cfm_opcode(CFM_LINK_TRACE_REPLY) +class link_trace_reply(link_trace): + + """CFM (IEEE Std 802.1ag-2007) Linktrace Reply (LTR) encoder/decoder class. + + This is used with ryu.lib.packet.cfm.cfm. + + An instance has the following attributes at least. + Most of them are same to the on-wire counterparts but in host byte order. + __init__ takes the corresponding args in this order. + + .. tabularcolumns:: |l|L| + + ==================== ======================================= + Attribute Description + ==================== ======================================= + version The protocol version number. + use_fdb_only UseFDBonly bit. + fwd_yes FwdYes bit. + terminal_mep TerminalMep bit. + transaction_id LTR Transaction Identifier. + ttl Reply TTL. + relay_action Relay Action.The default is 1 (RlyHit) + tlvs TLVs. + ==================== ======================================= + """ + _PACK_STR = '!4BIBB' + _ALL_PACK_LEN = struct.calcsize(_PACK_STR) + _MIN_LEN = _ALL_PACK_LEN + _TLV_OFFSET = 6 + + # Relay Action field values + _RLY_HIT = 1 + _RLY_FDB = 2 + _RLY_MPDB = 3 + + def __init__(self, md_lv=0, version=CFM_VERSION, use_fdb_only=1, + fwd_yes=0, terminal_mep=1, transaction_id=0, + ttl=64, relay_action=_RLY_HIT, tlvs=None + ): + super(link_trace_reply, self).__init__(md_lv, version, + use_fdb_only, + transaction_id, + ttl, + tlvs) + self._opcode = CFM_LINK_TRACE_REPLY + assert fwd_yes in [0, 1] + self.fwd_yes = fwd_yes + assert terminal_mep in [0, 1] + self.terminal_mep = terminal_mep + assert relay_action in [self._RLY_HIT, self._RLY_FDB, self._RLY_MPDB] + self.relay_action = relay_action + + @classmethod + def parser(cls, buf): + (md_lv_version, opcode, flags, tlv_offset, transaction_id, ttl, + relay_action) = struct.unpack_from(cls._PACK_STR, buf) + md_lv = int(md_lv_version >> 5) + version = int(md_lv_version & 0x1f) + use_fdb_only = int(flags >> 7) + fwd_yes = int(flags >> 6 & 0x01) + terminal_mep = int(flags >> 5 & 0x01) + tlvs = cls._parser_tlvs(buf[cls._MIN_LEN:]) + return cls(md_lv, version, use_fdb_only, fwd_yes, terminal_mep, + transaction_id, ttl, relay_action, tlvs) + + def serialize(self): + buf = struct.pack(self._PACK_STR, + (self.md_lv << 5) | self.version, + self._opcode, + (self.use_fdb_only << 7) | + (self.fwd_yes << 6) | + (self.terminal_mep << 5), + self._TLV_OFFSET, + self.transaction_id, + self.ttl, + self.relay_action, + ) + buf = bytearray(buf) + if self.tlvs: + buf.extend(self._serialize_tlvs(self.tlvs)) + buf.extend(struct.pack("!B", CFM_END_TLV)) + return buf + + +cfm.set_classes(cfm._CFM_OPCODE) + + +@six.add_metaclass(abc.ABCMeta) +class tlv(stringify.StringifyMixin): + + _TYPE_LEN = 1 + _LENGTH_LEN = 2 + _TYPE = { + 'ascii': [ + 'egress_id_mac', 'last_egress_id_mac', + 'next_egress_id_mac', 'mac_address' + ] + } + + def __init__(self, length): + self.length = length + + @classmethod + @abc.abstractmethod + def parser(cls, buf): + pass + + @abc.abstractmethod + def serialize(self): + pass + + def __len__(self): + return self.length + self._TYPE_LEN + self._LENGTH_LEN + + +@operation.register_tlv_types(CFM_SENDER_ID_TLV) +class sender_id_tlv(tlv): + + """CFM (IEEE Std 802.1ag-2007) Sender ID TLV encoder/decoder class. + + This is used with ryu.lib.packet.cfm.cfm. + + An instance has the following attributes at least. + Most of them are same to the on-wire counterparts but in host byte order. + __init__ takes the corresponding args in this order. + + .. tabularcolumns:: |l|L| + + ==================== ======================================= + Attribute Description + ==================== ======================================= + length Length of Value field. + (0 means automatically-calculate when encoding.) + chassis_id_length Chassis ID Length. + (0 means automatically-calculate when encoding.) + chassis_id_subtype Chassis ID Subtype. + The default is 4 (Mac Address) + chassis_id Chassis ID. + ma_domain_length Management Address Domain Length. + (0 means automatically-calculate when encoding.) + ma_domain Management Address Domain. + ma_length Management Address Length. + (0 means automatically-calculate when encoding.) + ma Management Address. + ==================== ======================================= + """ + + _PACK_STR = '!BHB' + _MIN_LEN = struct.calcsize(_PACK_STR) + _CHASSIS_ID_LENGTH_LEN = 1 + _CHASSIS_ID_SUBTYPE_LEN = 1 + _MA_DOMAIN_LENGTH_LEN = 1 + _MA_LENGTH_LEN = 1 + + # Chassis ID subtype enumeration + _CHASSIS_ID_CHASSIS_COMPONENT = 1 + _CHASSIS_ID_INTERFACE_ALIAS = 2 + _CHASSIS_ID_PORT_COMPONENT = 3 + _CHASSIS_ID_MAC_ADDRESS = 4 + _CHASSIS_ID_NETWORK_ADDRESS = 5 + _CHASSIS_ID_INTERFACE_NAME = 6 + _CHASSIS_ID_LOCALLY_ASSIGNED = 7 + + def __init__(self, + length=0, + chassis_id_length=0, + chassis_id_subtype=_CHASSIS_ID_MAC_ADDRESS, + chassis_id="", + ma_domain_length=0, + ma_domain="", + ma_length=0, + ma="" + ): + super(sender_id_tlv, self).__init__(length) + self._type = CFM_SENDER_ID_TLV + self.chassis_id_length = chassis_id_length + assert chassis_id_subtype in [ + self._CHASSIS_ID_CHASSIS_COMPONENT, + self._CHASSIS_ID_INTERFACE_ALIAS, + self._CHASSIS_ID_PORT_COMPONENT, + self._CHASSIS_ID_MAC_ADDRESS, + self._CHASSIS_ID_NETWORK_ADDRESS, + self._CHASSIS_ID_INTERFACE_NAME, + self._CHASSIS_ID_LOCALLY_ASSIGNED] + self.chassis_id_subtype = chassis_id_subtype + self.chassis_id = chassis_id + self.ma_domain_length = ma_domain_length + self.ma_domain = ma_domain + self.ma_length = ma_length + self.ma = ma + + @classmethod + def parser(cls, buf): + (type_, length, chassis_id_length) = struct.unpack_from(cls._PACK_STR, + buf) + chassis_id_subtype = 4 + chassis_id = "" + ma_domain_length = 0 + ma_domain = "" + ma_length = 0 + ma = "" + offset = cls._MIN_LEN + if chassis_id_length != 0: + (chassis_id_subtype, ) = struct.unpack_from("!B", buf, offset) + offset += cls._CHASSIS_ID_SUBTYPE_LEN + form = "%ds" % chassis_id_length + (chassis_id,) = struct.unpack_from(form, buf, offset) + offset += chassis_id_length + if length + (cls._TYPE_LEN + cls._LENGTH_LEN) > offset: + (ma_domain_length, ) = struct.unpack_from("!B", buf, offset) + offset += cls._MA_DOMAIN_LENGTH_LEN + form = "%ds" % ma_domain_length + (ma_domain, ) = struct.unpack_from(form, buf, offset) + offset += ma_domain_length + if length + (cls._TYPE_LEN + cls._LENGTH_LEN) > offset: + (ma_length, ) = struct.unpack_from("!B", buf, offset) + offset += cls._MA_LENGTH_LEN + form = "%ds" % ma_length + (ma, ) = struct.unpack_from(form, buf, offset) + return cls(length, chassis_id_length, chassis_id_subtype, + chassis_id, ma_domain_length, ma_domain, ma_length, ma) + + def serialize(self): + # calculate length when it contains 0 + if self.chassis_id_length == 0: + self.chassis_id_length = len(self.chassis_id) + if self.ma_domain_length == 0: + self.ma_domain_length = len(self.ma_domain) + if self.ma_length == 0: + self.ma_length = len(self.ma) + if self.length == 0: + self.length += self._CHASSIS_ID_LENGTH_LEN + if self.chassis_id_length != 0: + self.length += (self._CHASSIS_ID_SUBTYPE_LEN + + self.chassis_id_length) + if self.chassis_id_length != 0 or self.ma_domain_length != 0: + self.length += self._MA_DOMAIN_LENGTH_LEN + if self.ma_domain_length != 0: + self.length += (self.ma_domain_length + + self._MA_LENGTH_LEN + self.ma_length) + # start serialize + buf = struct.pack(self._PACK_STR, + self._type, + self.length, + self.chassis_id_length + ) + buf = bytearray(buf) + # Chassis ID Subtype and Chassis ID present + # if the Chassis ID Length field contains not 0. + if self.chassis_id_length != 0: + buf.extend(struct.pack("!B", self.chassis_id_subtype)) + form = "%ds" % self.chassis_id_length + buf.extend(struct.pack(form, self.chassis_id)) + # Management Address Domain Length present + # if the Chassis ID Length field or Management Address Length field + # contains not 0. + if self.chassis_id_length != 0 or self.ma_domain_length != 0: + buf.extend(struct.pack("!B", self.ma_domain_length)) + # Management Address Domain present + # Management Address Domain Length field contains not 0. + if self.ma_domain_length != 0: + form = "%ds" % self.ma_domain_length + buf.extend(struct.pack(form, self.ma_domain)) + buf.extend(struct.pack("!B", self.ma_length)) + # Management Address present + # Management Address Length field contains not 0. + if self.ma_length != 0: + form = "%ds" % self.ma_length + buf.extend(struct.pack(form, self.ma)) + return buf + + +@operation.register_tlv_types(CFM_PORT_STATUS_TLV) +class port_status_tlv(tlv): + + """CFM (IEEE Std 802.1ag-2007) Port Status TLV encoder/decoder class. + + This is used with ryu.lib.packet.cfm.cfm. + + An instance has the following attributes at least. + Most of them are same to the on-wire counterparts but in host byte order. + __init__ takes the corresponding args in this order. + + .. tabularcolumns:: |l|L| + + ==================== ======================================= + Attribute Description + ==================== ======================================= + length Length of Value field. + (0 means automatically-calculate when encoding.) + port_status Port Status.The default is 1 (psUp) + ==================== ======================================= + """ + + _PACK_STR = '!BHB' + _MIN_LEN = struct.calcsize(_PACK_STR) + + # Port Status TLV values + _PS_BLOCKED = 1 + _PS_UP = 2 + + def __init__(self, length=0, port_status=_PS_UP): + super(port_status_tlv, self).__init__(length) + self._type = CFM_PORT_STATUS_TLV + assert port_status in [self._PS_BLOCKED, self._PS_UP] + self.port_status = port_status + + @classmethod + def parser(cls, buf): + (type_, length, + port_status) = struct.unpack_from(cls._PACK_STR, buf) + return cls(length, port_status) + + def serialize(self): + # calculate length when it contains 0 + if self.length == 0: + self.length = 1 + # start serialize + buf = struct.pack(self._PACK_STR, + self._type, + self.length, + self.port_status) + return bytearray(buf) + + +@operation.register_tlv_types(CFM_DATA_TLV) +class data_tlv(tlv): + + """CFM (IEEE Std 802.1ag-2007) Data TLV encoder/decoder class. + + This is used with ryu.lib.packet.cfm.cfm. + + An instance has the following attributes at least. + Most of them are same to the on-wire counterparts but in host byte order. + __init__ takes the corresponding args in this order. + + .. tabularcolumns:: |l|L| + + ============== ======================================= + Attribute Description + ============== ======================================= + length Length of Value field. + (0 means automatically-calculate when encoding) + data_value Bit pattern of any of n octets.(n = length) + ============== ======================================= + """ + + _PACK_STR = '!BH' + _MIN_LEN = struct.calcsize(_PACK_STR) + + def __init__(self, length=0, data_value="" + ): + super(data_tlv, self).__init__(length) + self._type = CFM_DATA_TLV + self.data_value = data_value + + @classmethod + def parser(cls, buf): + (type_, length) = struct.unpack_from(cls._PACK_STR, buf) + form = "%ds" % length + (data_value, ) = struct.unpack_from(form, buf, cls._MIN_LEN) + return cls(length, data_value) + + def serialize(self): + # calculate length when it contains 0 + if self.length == 0: + self.length = len(self.data_value) + # start serialize + buf = struct.pack(self._PACK_STR, + self._type, + self.length) + buf = bytearray(buf) + form = "%ds" % self.length + buf.extend(struct.pack(form, self.data_value)) + return buf + + +@operation.register_tlv_types(CFM_INTERFACE_STATUS_TLV) +class interface_status_tlv(tlv): + + """CFM (IEEE Std 802.1ag-2007) Interface Status TLV encoder/decoder class. + + This is used with ryu.lib.packet.cfm.cfm. + + An instance has the following attributes at least. + Most of them are same to the on-wire counterparts but in host byte order. + __init__ takes the corresponding args in this order. + + .. tabularcolumns:: |l|L| + + ==================== ======================================= + Attribute Description + ==================== ======================================= + length Length of Value field. + (0 means automatically-calculate when encoding.) + interface_status Interface Status.The default is 1 (isUp) + ==================== ======================================= + """ + + _PACK_STR = '!BHB' + _MIN_LEN = struct.calcsize(_PACK_STR) + + # Interface Status TLV values + _IS_UP = 1 + _IS_DOWN = 2 + _IS_TESTING = 3 + _IS_UNKNOWN = 4 + _IS_DORMANT = 5 + _IS_NOT_PRESENT = 6 + _IS_LOWER_LAYER_DOWN = 7 + + def __init__(self, length=0, interface_status=_IS_UP): + super(interface_status_tlv, self).__init__(length) + self._type = CFM_INTERFACE_STATUS_TLV + assert interface_status in [ + self._IS_UP, self._IS_DOWN, self._IS_TESTING, + self._IS_UNKNOWN, self._IS_DORMANT, self._IS_NOT_PRESENT, + self._IS_LOWER_LAYER_DOWN] + self.interface_status = interface_status + + @classmethod + def parser(cls, buf): + (type_, length, + interface_status) = struct.unpack_from(cls._PACK_STR, buf) + return cls(length, interface_status) + + def serialize(self): + # calculate length when it contains 0 + if self.length == 0: + self.length = 1 + # start serialize + buf = struct.pack(self._PACK_STR, + self._type, + self.length, + self.interface_status) + return bytearray(buf) + + +@operation.register_tlv_types(CFM_LTM_EGRESS_IDENTIFIER_TLV) +class ltm_egress_identifier_tlv(tlv): + + """CFM (IEEE Std 802.1ag-2007) LTM EGRESS TLV encoder/decoder class. + + This is used with ryu.lib.packet.cfm.cfm. + + An instance has the following attributes at least. + Most of them are same to the on-wire counterparts but in host byte order. + __init__ takes the corresponding args in this order. + + .. tabularcolumns:: |l|L| + + ============== ======================================= + Attribute Description + ============== ======================================= + length Length of Value field. + (0 means automatically-calculate when encoding.) + egress_id_ui Egress Identifier of Unique ID. + egress_id_mac Egress Identifier of MAC address. + ============== ======================================= + """ + + _PACK_STR = '!BHH6s' + _MIN_LEN = struct.calcsize(_PACK_STR) + + def __init__(self, + length=0, + egress_id_ui=0, + egress_id_mac='00:00:00:00:00:00' + ): + super(ltm_egress_identifier_tlv, self).__init__(length) + self._type = CFM_LTM_EGRESS_IDENTIFIER_TLV + self.egress_id_ui = egress_id_ui + self.egress_id_mac = egress_id_mac + + @classmethod + def parser(cls, buf): + (type_, length, egress_id_ui, + egress_id_mac) = struct.unpack_from(cls._PACK_STR, buf) + return cls(length, egress_id_ui, + addrconv.mac.bin_to_text(egress_id_mac)) + + def serialize(self): + # calculate length when it contains 0 + if self.length == 0: + self.length = 8 + # start serialize + buf = struct.pack(self._PACK_STR, + self._type, + self.length, + self.egress_id_ui, + addrconv.mac.text_to_bin(self.egress_id_mac) + ) + return bytearray(buf) + + +@operation.register_tlv_types(CFM_LTR_EGRESS_IDENTIFIER_TLV) +class ltr_egress_identifier_tlv(tlv): + + """CFM (IEEE Std 802.1ag-2007) LTR EGRESS TLV encoder/decoder class. + + This is used with ryu.lib.packet.cfm.cfm. + + An instance has the following attributes at least. + Most of them are same to the on-wire counterparts but in host byte order. + __init__ takes the corresponding args in this order. + + .. tabularcolumns:: |l|L| + + ==================== ======================================= + Attribute Description + ==================== ======================================= + length Length of Value field. + (0 means automatically-calculate when encoding.) + last_egress_id_ui Last Egress Identifier of Unique ID. + last_egress_id_mac Last Egress Identifier of MAC address. + next_egress_id_ui Next Egress Identifier of Unique ID. + next_egress_id_mac Next Egress Identifier of MAC address. + ==================== ======================================= + """ + + _PACK_STR = '!BHH6sH6s' + _MIN_LEN = struct.calcsize(_PACK_STR) + + def __init__(self, + length=0, + last_egress_id_ui=0, + last_egress_id_mac='00:00:00:00:00:00', + next_egress_id_ui=0, + next_egress_id_mac='00:00:00:00:00:00' + ): + super(ltr_egress_identifier_tlv, self).__init__(length) + self._type = CFM_LTR_EGRESS_IDENTIFIER_TLV + self.last_egress_id_ui = last_egress_id_ui + self.last_egress_id_mac = last_egress_id_mac + self.next_egress_id_ui = next_egress_id_ui + self.next_egress_id_mac = next_egress_id_mac + + @classmethod + def parser(cls, buf): + (type_, length, + last_egress_id_ui, last_egress_id_mac, + next_egress_id_ui, next_egress_id_mac + ) = struct.unpack_from(cls._PACK_STR, buf) + return cls(length, + last_egress_id_ui, + addrconv.mac.bin_to_text(last_egress_id_mac), + next_egress_id_ui, + addrconv.mac.bin_to_text(next_egress_id_mac)) + + def serialize(self): + # calculate length when it contains 0 + if self.length == 0: + self.length = 16 + # start serialize + buf = struct.pack(self._PACK_STR, + self._type, + self.length, + self.last_egress_id_ui, + addrconv.mac.text_to_bin(self.last_egress_id_mac), + self.next_egress_id_ui, + addrconv.mac.text_to_bin(self.next_egress_id_mac) + ) + return bytearray(buf) + + +@operation.register_tlv_types(CFM_ORGANIZATION_SPECIFIC_TLV) +class organization_specific_tlv(tlv): + + """CFM (IEEE Std 802.1ag-2007) Organization Specific TLV + encoder/decoder class. + + This is used with ryu.lib.packet.cfm.cfm. + + An instance has the following attributes at least. + Most of them are same to the on-wire counterparts but in host byte order. + __init__ takes the corresponding args in this order. + + .. tabularcolumns:: |l|L| + + ============== ======================================= + Attribute Description + ============== ======================================= + length Length of Value field. + (0 means automatically-calculate when encoding.) + oui Organizationally Unique Identifier. + subtype Subtype. + value Value.(optional) + ============== ======================================= + """ + + _PACK_STR = '!BH3sB' + _MIN_LEN = struct.calcsize(_PACK_STR) + _OUI_AND_SUBTYPE_LEN = 4 + + def __init__(self, + length=0, + oui="\x00\x00\x00", + subtype=0, + value="" + ): + super(organization_specific_tlv, self).__init__(length) + self._type = CFM_ORGANIZATION_SPECIFIC_TLV + self.oui = oui + self.subtype = subtype + self.value = value + + @classmethod + def parser(cls, buf): + (type_, length, oui, subtype) = struct.unpack_from(cls._PACK_STR, buf) + value = "" + if length > cls._OUI_AND_SUBTYPE_LEN: + form = "%ds" % (length - cls._OUI_AND_SUBTYPE_LEN) + (value,) = struct.unpack_from(form, buf, cls._MIN_LEN) + return cls(length, oui, subtype, value) + + def serialize(self): + # calculate length when it contains 0 + if self.length == 0: + self.length = len(self.value) + self._OUI_AND_SUBTYPE_LEN + # start serialize + buf = struct.pack(self._PACK_STR, + self._type, + self.length, + self.oui, + self.subtype, + ) + buf = bytearray(buf) + form = "%ds" % (self.length - self._OUI_AND_SUBTYPE_LEN) + buf.extend(struct.pack(form, self.value)) + return buf + + +class reply_tlv(tlv): + + _PACK_STR = '!BHB6s' + _MIN_LEN = struct.calcsize(_PACK_STR) + _MIN_VALUE_LEN = _MIN_LEN - struct.calcsize('!BH') + _PORT_ID_LENGTH_LEN = 1 + _PORT_ID_SUBTYPE_LEN = 1 + + def __init__(self, length, action, mac_address, port_id_length, + port_id_subtype, port_id): + super(reply_tlv, self).__init__(length) + self.action = action + self.mac_address = mac_address + self.port_id_length = port_id_length + self.port_id_subtype = port_id_subtype + self.port_id = port_id + + @classmethod + def parser(cls, buf): + (type_, length, action, + mac_address) = struct.unpack_from(cls._PACK_STR, buf) + port_id_length = 0 + port_id_subtype = 0 + port_id = "" + if length > cls._MIN_VALUE_LEN: + (port_id_length, + port_id_subtype) = struct.unpack_from('!2B', buf, cls._MIN_LEN) + form = "%ds" % port_id_length + (port_id,) = struct.unpack_from(form, buf, + cls._MIN_LEN + + cls._PORT_ID_LENGTH_LEN + + cls._PORT_ID_SUBTYPE_LEN) + return cls(length, action, + addrconv.mac.bin_to_text(mac_address), + port_id_length, port_id_subtype, port_id) + + def serialize(self): + # calculate length when it contains 0 + if self.port_id_length == 0: + self.port_id_length = len(self.port_id) + if self.length == 0: + self.length = self._MIN_VALUE_LEN + if self.port_id_length != 0: + self.length += (self.port_id_length + + self._PORT_ID_LENGTH_LEN + + self._PORT_ID_SUBTYPE_LEN) + # start serialize + buf = struct.pack(self._PACK_STR, + self._type, + self.length, + self.action, + addrconv.mac.text_to_bin(self.mac_address), + ) + buf = bytearray(buf) + if self.port_id_length != 0: + buf.extend(struct.pack("!BB", + self.port_id_length, + self.port_id_subtype)) + form = "%ds" % self.port_id_length + buf.extend(struct.pack(form, self.port_id)) + return buf + + +@operation.register_tlv_types(CFM_REPLY_INGRESS_TLV) +class reply_ingress_tlv(reply_tlv): + + """CFM (IEEE Std 802.1ag-2007) Reply Ingress TLV encoder/decoder class. + + This is used with ryu.lib.packet.cfm.cfm. + + An instance has the following attributes at least. + Most of them are same to the on-wire counterparts but in host byte order. + __init__ takes the corresponding args in this order. + + .. tabularcolumns:: |l|L| + + ================= ======================================= + Attribute Description + ================= ======================================= + length Length of Value field. + (0 means automatically-calculate when encoding.) + action Ingress Action.The default is 1 (IngOK) + mac_address Ingress MAC Address. + port_id_length Ingress PortID Length. + (0 means automatically-calculate when encoding.) + port_id_subtype Ingress PortID Subtype. + port_id Ingress PortID. + ================= ======================================= + """ + + # Ingress Action field values + _ING_OK = 1 + _ING_DOWN = 2 + _ING_BLOCKED = 3 + _ING_VID = 4 + + def __init__(self, + length=0, + action=_ING_OK, + mac_address='00:00:00:00:00:00', + port_id_length=0, + port_id_subtype=0, + port_id="" + ): + super(reply_ingress_tlv, self).__init__(length, action, + mac_address, port_id_length, + port_id_subtype, port_id) + assert action in [self._ING_OK, self._ING_DOWN, + self._ING_BLOCKED, self._ING_VID] + self._type = CFM_REPLY_INGRESS_TLV + + +@operation.register_tlv_types(CFM_REPLY_EGRESS_TLV) +class reply_egress_tlv(reply_tlv): + + """CFM (IEEE Std 802.1ag-2007) Reply Egress TLV encoder/decoder class. + + This is used with ryu.lib.packet.cfm.cfm. + + An instance has the following attributes at least. + Most of them are same to the on-wire counterparts but in host byte order. + __init__ takes the corresponding args in this order. + + .. tabularcolumns:: |l|L| + + ================= ======================================= + Attribute Description + ================= ======================================= + length Length of Value field. + (0 means automatically-calculate when encoding.) + action Egress Action.The default is 1 (EgrOK) + mac_address Egress MAC Address. + port_id_length Egress PortID Length. + (0 means automatically-calculate when encoding.) + port_id_subtype Egress PortID Subtype. + port_id Egress PortID. + ================= ======================================= + """ + + # Egress Action field values + _EGR_OK = 1 + _EGR_DOWN = 2 + _EGR_BLOCKED = 3 + _EGR_VID = 4 + + def __init__(self, + length=0, + action=_EGR_OK, + mac_address='00:00:00:00:00:00', + port_id_length=0, + port_id_subtype=0, + port_id="" + ): + super(reply_egress_tlv, self).__init__(length, action, + mac_address, port_id_length, + port_id_subtype, port_id) + assert action in [self._EGR_OK, self._EGR_DOWN, + self._EGR_BLOCKED, self._EGR_VID] + self._type = CFM_REPLY_EGRESS_TLV + + +operation.set_classes(operation._TLV_TYPES) diff --git a/ryu/lib/packet/vlan.py b/ryu/lib/packet/vlan.py index a4f4332c..e44afce6 100644 --- a/ryu/lib/packet/vlan.py +++ b/ryu/lib/packet/vlan.py @@ -24,6 +24,7 @@ from . import lldp from . import slow from . import llc from . import pbb +from . import cfm from ryu.ofproto import ether @@ -121,6 +122,7 @@ vlan.register_packet_type(ipv6.ipv6, ether.ETH_TYPE_IPV6) vlan.register_packet_type(lldp.lldp, ether.ETH_TYPE_LLDP) vlan.register_packet_type(slow.slow, ether.ETH_TYPE_SLOW) vlan.register_packet_type(llc.llc, ether.ETH_TYPE_IEEE802_3) +vlan.register_packet_type(cfm.cfm, ether.ETH_TYPE_CFM) svlan.register_packet_type(vlan, ether.ETH_TYPE_8021Q) svlan.register_packet_type(pbb.itag, ether.ETH_TYPE_8021AH) diff --git a/ryu/ofproto/ether.py b/ryu/ofproto/ether.py index f17f7be6..bfa78995 100644 --- a/ryu/ofproto/ether.py +++ b/ryu/ofproto/ether.py @@ -24,3 +24,4 @@ ETH_TYPE_8021AD = 0x88a8 ETH_TYPE_LLDP = 0x88cc ETH_TYPE_8021AH = 0x88e7 ETH_TYPE_IEEE802_3 = 0x05dc +ETH_TYPE_CFM = 0x8902 diff --git a/ryu/tests/unit/packet/test_cfm.py b/ryu/tests/unit/packet/test_cfm.py new file mode 100644 index 00000000..49207e92 --- /dev/null +++ b/ryu/tests/unit/packet/test_cfm.py @@ -0,0 +1,1760 @@ +# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import unittest +import logging +import inspect +import struct + +from nose.tools import * +from nose.plugins.skip import Skip, SkipTest +from ryu.lib import addrconv +from ryu.lib.packet import cfm + +LOG = logging.getLogger(__name__) + + +class Test_cfm(unittest.TestCase): + + def setUp(self): + self.message = cfm.cc_message() + self.ins = cfm.cfm(self.message) + data = bytearray() + prev = None + self.buf = self.ins.serialize(data, prev) + + def setUp_cc_message(self): + self.cc_message_md_lv = 1 + self.cc_message_version = 1 + self.cc_message_rdi = 1 + self.cc_message_interval = 1 + self.cc_message_seq_num = 123 + self.cc_message_mep_id = 4 + self.cc_message_md_name_format = 4 + self.cc_message_md_name_length = 0 + self.cc_message_md_name = "hoge" + self.cc_message_short_ma_name_format = 2 + self.cc_message_short_ma_name_length = 0 + self.cc_message_short_ma_name = "pakeratta" + self.cc_message_md_name_txfcf = 11 + self.cc_message_md_name_rxfcb = 22 + self.cc_message_md_name_txfcb = 33 + self.cc_message_tlvs = [ + cfm.sender_id_tlv(), + cfm.port_status_tlv(), + cfm.data_tlv(), + cfm.interface_status_tlv(), + cfm.reply_ingress_tlv(), + cfm.reply_egress_tlv(), + cfm.ltm_egress_identifier_tlv(), + cfm.ltr_egress_identifier_tlv(), + cfm.organization_specific_tlv(), + ] + self.message = cfm.cc_message( + self.cc_message_md_lv, + self.cc_message_version, + self.cc_message_rdi, + self.cc_message_interval, + self.cc_message_seq_num, + self.cc_message_mep_id, + self.cc_message_md_name_format, + self.cc_message_md_name_length, + self.cc_message_md_name, + self.cc_message_short_ma_name_format, + self.cc_message_short_ma_name_length, + self.cc_message_short_ma_name, + self.cc_message_tlvs + ) + self.ins = cfm.cfm(self.message) + data = bytearray() + prev = None + self.buf = self.ins.serialize(data, prev) + + def setUp_loopback_message(self): + self.loopback_message_md_lv = 1 + self.loopback_message_version = 1 + self.loopback_message_transaction_id = 12345 + self.loopback_message_tlvs = [ + cfm.sender_id_tlv(), + cfm.port_status_tlv(), + cfm.data_tlv(), + cfm.interface_status_tlv(), + cfm.reply_ingress_tlv(), + cfm.reply_egress_tlv(), + cfm.ltm_egress_identifier_tlv(), + cfm.ltr_egress_identifier_tlv(), + cfm.organization_specific_tlv(), + ] + self.message = cfm.loopback_message( + self.loopback_message_md_lv, + self.loopback_message_version, + self.loopback_message_transaction_id, + self.loopback_message_tlvs) + self.ins = cfm.cfm(self.message) + data = bytearray() + prev = None + self.buf = self.ins.serialize(data, prev) + + def setUp_loopback_reply(self): + self.loopback_reply_md_lv = 1 + self.loopback_reply_version = 1 + self.loopback_reply_transaction_id = 12345 + self.loopback_reply_tlvs = [ + cfm.sender_id_tlv(), + cfm.port_status_tlv(), + cfm.data_tlv(), + cfm.interface_status_tlv(), + cfm.reply_ingress_tlv(), + cfm.reply_egress_tlv(), + cfm.ltm_egress_identifier_tlv(), + cfm.ltr_egress_identifier_tlv(), + cfm.organization_specific_tlv(), + ] + self.message = cfm.loopback_reply( + self.loopback_reply_md_lv, + self.loopback_reply_version, + self.loopback_reply_transaction_id, + self.loopback_reply_tlvs) + self.ins = cfm.cfm(self.message) + data = bytearray() + prev = None + self.buf = self.ins.serialize(data, prev) + + def setUp_link_trace_message(self): + self.link_trace_message_md_lv = 1 + self.link_trace_message_version = 1 + self.link_trace_message_use_fdb_only = 1 + self.link_trace_message_transaction_id = 12345 + self.link_trace_message_ttl = 123 + self.link_trace_message_ltm_orig_addr = '11:22:33:44:55:66' + self.link_trace_message_ltm_targ_addr = '77:88:99:aa:cc:dd' + self.link_trace_message_tlvs = [ + cfm.sender_id_tlv(), + cfm.port_status_tlv(), + cfm.data_tlv(), + cfm.interface_status_tlv(), + cfm.reply_ingress_tlv(), + cfm.reply_egress_tlv(), + cfm.ltm_egress_identifier_tlv(), + cfm.ltr_egress_identifier_tlv(), + cfm.organization_specific_tlv(), + ] + self.message = cfm.link_trace_message( + self.link_trace_message_md_lv, + self.link_trace_message_version, + self.link_trace_message_use_fdb_only, + self.link_trace_message_transaction_id, + self.link_trace_message_ttl, + self.link_trace_message_ltm_orig_addr, + self.link_trace_message_ltm_targ_addr, + self.link_trace_message_tlvs) + self.ins = cfm.cfm(self.message) + data = bytearray() + prev = None + self.buf = self.ins.serialize(data, prev) + + def setUp_link_trace_reply(self): + self.link_trace_reply_md_lv = 1 + self.link_trace_reply_version = 1 + self.link_trace_reply_use_fdb_only = 1 + self.link_trace_reply_fwd_yes = 0 + self.link_trace_reply_terminal_mep = 1 + self.link_trace_reply_transaction_id = 5432 + self.link_trace_reply_ttl = 123 + self.link_trace_reply_relay_action = 3 + self.link_trace_reply_tlvs = [ + cfm.sender_id_tlv(), + cfm.port_status_tlv(), + cfm.data_tlv(), + cfm.interface_status_tlv(), + cfm.reply_ingress_tlv(), + cfm.reply_egress_tlv(), + cfm.ltm_egress_identifier_tlv(), + cfm.ltr_egress_identifier_tlv(), + cfm.organization_specific_tlv(), + ] + self.message = cfm.link_trace_reply( + self.link_trace_reply_md_lv, + self.link_trace_reply_version, + self.link_trace_reply_use_fdb_only, + self.link_trace_reply_fwd_yes, + self.link_trace_reply_terminal_mep, + self.link_trace_reply_transaction_id, + self.link_trace_reply_ttl, + self.link_trace_reply_relay_action, + self.link_trace_reply_tlvs) + self.ins = cfm.cfm(self.message) + data = bytearray() + prev = None + self.buf = self.ins.serialize(data, prev) + + def tearDown(self): + pass + + def test_init(self): + eq_(str(self.message), str(self.ins.op)) + + def test_init_cc_message(self): + self.setUp_cc_message() + self.test_init() + + def test_init_loopback_message(self): + self.setUp_loopback_message() + self.test_init() + + def test_init_loopback_reply(self): + self.setUp_loopback_reply() + self.test_init() + + def test_init_link_trace_message(self): + self.setUp_link_trace_message() + self.test_init() + + def test_init_link_trace_reply(self): + self.setUp_link_trace_reply() + self.test_init() + + def test_parser(self): + _res = self.ins.parser(str(self.buf)) + + if type(_res) is tuple: + res = _res[0] + else: + res = _res + eq_(str(self.message), str(res.op)) + + def test_parser_with_cc_message(self): + self.setUp_cc_message() + self.test_parser() + + def test_parser_with_loopback_message(self): + self.setUp_loopback_message() + self.test_parser() + + def test_parser_with_loopback_reply(self): + self.setUp_loopback_reply() + self.test_parser() + + def test_parser_with_link_trace_message(self): + self.setUp_link_trace_message() + self.test_parser() + + def test_parser_with_link_trace_reply(self): + self.setUp_link_trace_reply() + self.test_parser() + + def test_serialize(self): + pass + + def test_serialize_with_cc_message(self): + self.setUp_cc_message() + self.test_serialize() + data = bytearray() + prev = None + buf = self.ins.serialize(data, prev) + cc_message = cfm.cc_message.parser(str(buf)) + eq_(repr(self.message), repr(cc_message)) + + def test_serialize_with_loopback_message(self): + self.setUp_loopback_message() + self.test_serialize() + data = bytearray() + prev = None + buf = self.ins.serialize(data, prev) + loopback_message = cfm.loopback_message.parser(str(buf)) + eq_(repr(self.message), repr(loopback_message)) + + def test_serialize_with_loopback_reply(self): + self.setUp_loopback_reply() + self.test_serialize() + data = bytearray() + prev = None + buf = self.ins.serialize(data, prev) + loopback_reply = cfm.loopback_reply.parser(str(buf)) + eq_(repr(self.message), repr(loopback_reply)) + + def test_serialize_with_link_trace_message(self): + self.setUp_link_trace_message() + self.test_serialize() + data = bytearray() + prev = None + buf = self.ins.serialize(data, prev) + link_trace_message = cfm.link_trace_message.parser(str(buf)) + eq_(repr(self.message), repr(link_trace_message)) + + def test_serialize_with_link_trace_reply(self): + self.setUp_link_trace_reply() + self.test_serialize() + data = bytearray() + prev = None + buf = self.ins.serialize(data, prev) + link_trace_reply = cfm.link_trace_reply.parser(str(buf)) + eq_(repr(self.message), repr(link_trace_reply)) + + def test_to_string(self): + cfm_values = {'op': self.message} + _cfm_str = ','.join(['%s=%s' % (k, cfm_values[k]) + for k, v in inspect.getmembers(self.ins) + if k in cfm_values]) + cfm_str = '%s(%s)' % (cfm.cfm.__name__, _cfm_str) + eq_(str(self.ins), cfm_str) + eq_(repr(self.ins), cfm_str) + + def test_to_string_cc_message(self): + self.setUp_cc_message() + self.test_to_string() + + def test_to_string_loopback_message(self): + self.setUp_loopback_message() + self.test_to_string() + + def test_to_string_loopback_reply(self): + self.setUp_loopback_reply() + self.test_to_string() + + def test_to_string_link_trace_message(self): + self.setUp_link_trace_message() + self.test_to_string() + + def test_to_string_link_trace_reply(self): + self.setUp_link_trace_reply() + self.test_to_string() + + def test_len(self): + pass + + def test_len_cc_message(self): + self.setUp_cc_message() + eq_(len(self.ins), 0 + len(self.message)) + + def test_len_loopback_message(self): + self.setUp_loopback_message() + eq_(len(self.ins), 0 + len(self.message)) + + def test_len_loopback_reply(self): + self.setUp_loopback_reply() + eq_(len(self.ins), 0 + len(self.message)) + + def test_len_link_trace_message(self): + self.setUp_link_trace_message() + eq_(len(self.ins), 0 + len(self.message)) + + def test_len_link_trace_reply(self): + self.setUp_link_trace_reply() + eq_(len(self.ins), 0 + len(self.message)) + + def test_default_args(self): + pass + + def test_json(self): + jsondict = self.ins.to_jsondict() + ins = cfm.cfm.from_jsondict(jsondict['cfm']) + eq_(str(self.ins), str(ins)) + + def test_json_with_cc_message(self): + self.setUp_cc_message() + self.test_json() + + def test_json_with_loopback_message(self): + self.setUp_loopback_message() + self.test_json() + + def test_json_with_loopback_reply(self): + self.setUp_loopback_reply() + self.test_json() + + def test_json_with_link_trace_message(self): + self.setUp_link_trace_message() + self.test_json() + + def test_json_with_link_trace_reply(self): + self.setUp_link_trace_reply() + self.test_json() + + +class Test_cc_message(unittest.TestCase): + + def setUp(self): + self.md_lv = 1 + self.version = 1 + self.opcode = cfm.CFM_CC_MESSAGE + self.rdi = 1 + self.interval = 5 + self.first_tlv_offset = cfm.cc_message._TLV_OFFSET + self.seq_num = 2 + self.mep_id = 2 + self.md_name_format = cfm.cc_message._MD_FMT_CHARACTER_STRING + self.md_name_length = 3 + self.md_name = "foo" + self.short_ma_name_format = 2 + self.short_ma_name_length = 8 + self.short_ma_name = "hogehoge" + self.tlvs = [ + ] + self.end_tlv = 0 + self.ins = cfm.cc_message( + self.md_lv, + self.version, + self.rdi, + self.interval, + self.seq_num, + self.mep_id, + self.md_name_format, + self.md_name_length, + self.md_name, + self.short_ma_name_format, + self.short_ma_name_length, + self.short_ma_name, + self.tlvs + ) + + self.form = '!4BIH2B3s2B8s33x12x4xB' + self.buf = struct.pack( + self.form, + (self.md_lv << 5) | self.version, + self.opcode, + (self.rdi << 7) | self.interval, + self.first_tlv_offset, + self.seq_num, + self.mep_id, + self.md_name_format, + self.md_name_length, + self.md_name, + self.short_ma_name_format, + self.short_ma_name_length, + self.short_ma_name, + self.end_tlv + ) + + def tearDown(self): + pass + + def test_init(self): + eq_(self.md_lv, self.ins.md_lv) + eq_(self.version, self.ins.version) + eq_(self.rdi, self.ins.rdi) + eq_(self.interval, self.ins.interval) + eq_(self.seq_num, self.ins.seq_num) + eq_(self.mep_id, self.ins.mep_id) + eq_(self.md_name_format, self.ins.md_name_format) + eq_(self.md_name_length, self.ins.md_name_length) + eq_(self.md_name, self.ins.md_name) + eq_(self.short_ma_name_format, self.ins.short_ma_name_format) + eq_(self.short_ma_name_length, self.ins.short_ma_name_length) + eq_(self.short_ma_name, self.ins.short_ma_name) + eq_(self.tlvs, self.ins.tlvs) + + def test_parser(self): + _res = cfm.cc_message.parser(self.buf) + if type(_res) is tuple: + res = _res[0] + else: + res = _res + eq_(self.md_lv, res.md_lv) + eq_(self.version, res.version) + eq_(self.rdi, res.rdi) + eq_(self.interval, res.interval) + eq_(self.seq_num, res.seq_num) + eq_(self.mep_id, res.mep_id) + eq_(self.md_name_format, res.md_name_format) + eq_(self.md_name_length, res.md_name_length) + eq_(self.md_name, res.md_name) + eq_(self.short_ma_name_format, res.short_ma_name_format) + eq_(self.short_ma_name_length, res.short_ma_name_length) + eq_(self.short_ma_name, res.short_ma_name) + eq_(self.tlvs, res.tlvs) + + def test_parser_with_no_maintenance_domain_name_present(self): + form = '!4BIH3B8s37x12x4xB' + buf = struct.pack( + form, + (self.md_lv << 5) | self.version, + self.opcode, + (self.rdi << 7) | self.interval, + self.first_tlv_offset, + self.seq_num, + self.mep_id, + cfm.cc_message._MD_FMT_NO_MD_NAME_PRESENT, + self.short_ma_name_format, + self.short_ma_name_length, + self.short_ma_name, + self.end_tlv + ) + _res = cfm.cc_message.parser(buf) + if type(_res) is tuple: + res = _res[0] + else: + res = _res + eq_(self.md_lv, res.md_lv) + eq_(self.version, res.version) + eq_(self.rdi, res.rdi) + eq_(self.interval, res.interval) + eq_(self.seq_num, res.seq_num) + eq_(self.mep_id, res.mep_id) + eq_(cfm.cc_message._MD_FMT_NO_MD_NAME_PRESENT, res.md_name_format) + eq_(self.short_ma_name_format, res.short_ma_name_format) + eq_(self.short_ma_name_length, res.short_ma_name_length) + eq_(self.short_ma_name, res.short_ma_name) + eq_(self.tlvs, res.tlvs) + + def test_serialize(self): + buf = self.ins.serialize() + res = struct.unpack_from(self.form, str(buf)) + eq_(self.md_lv, res[0] >> 5) + eq_(self.version, res[0] & 0x1f) + eq_(self.opcode, res[1]) + eq_(self.rdi, res[2] >> 7) + eq_(self.interval, res[2] & 0x07) + eq_(self.first_tlv_offset, res[3]) + eq_(self.seq_num, res[4]) + eq_(self.mep_id, res[5]) + eq_(self.md_name_format, res[6]) + eq_(self.md_name_length, res[7]) + eq_(self.md_name, res[8]) + eq_(self.short_ma_name_format, res[9]) + eq_(self.short_ma_name_length, res[10]) + eq_(self.short_ma_name, res[11]) + eq_(self.end_tlv, res[12]) + + def test_serialize_with_md_name_length_zero(self): + ins = cfm.cc_message( + self.md_lv, + self.version, + self.rdi, + self.interval, + self.seq_num, + self.mep_id, + self.md_name_format, + 0, + self.md_name, + self.short_ma_name_format, + 0, + self.short_ma_name, + self.tlvs + ) + buf = ins.serialize() + res = struct.unpack_from(self.form, str(buf)) + eq_(self.md_lv, res[0] >> 5) + eq_(self.version, res[0] & 0x1f) + eq_(self.opcode, res[1]) + eq_(self.rdi, res[2] >> 7) + eq_(self.interval, res[2] & 0x07) + eq_(self.first_tlv_offset, res[3]) + eq_(self.seq_num, res[4]) + eq_(self.mep_id, res[5]) + eq_(self.md_name_format, res[6]) + eq_(self.md_name_length, res[7]) + eq_(self.md_name, res[8]) + eq_(self.short_ma_name_format, res[9]) + eq_(self.short_ma_name_length, res[10]) + eq_(self.short_ma_name, res[11]) + eq_(self.end_tlv, res[12]) + + def test_serialize_with_no_maintenance_domain_name_present(self): + form = '!4BIH3B8s37x12x4xB' + ins = cfm.cc_message( + self.md_lv, + self.version, + self.rdi, + self.interval, + self.seq_num, + self.mep_id, + cfm.cc_message._MD_FMT_NO_MD_NAME_PRESENT, + 0, + self.md_name, + self.short_ma_name_format, + 0, + self.short_ma_name, + self.tlvs + ) + buf = ins.serialize() + res = struct.unpack_from(form, str(buf)) + eq_(self.md_lv, res[0] >> 5) + eq_(self.version, res[0] & 0x1f) + eq_(self.opcode, res[1]) + eq_(self.rdi, res[2] >> 7) + eq_(self.interval, res[2] & 0x07) + eq_(self.first_tlv_offset, res[3]) + eq_(self.seq_num, res[4]) + eq_(self.mep_id, res[5]) + eq_(cfm.cc_message._MD_FMT_NO_MD_NAME_PRESENT, res[6]) + eq_(self.short_ma_name_format, res[7]) + eq_(self.short_ma_name_length, res[8]) + eq_(self.short_ma_name, res[9]) + eq_(self.end_tlv, res[10]) + + def test_len(self): + # 75 octet (If tlv does not exist) + eq_(75, len(self.ins)) + + def test_default_args(self): + ins = cfm.cc_message() + buf = ins.serialize() + res = struct.unpack_from(cfm.cc_message._PACK_STR, str(buf)) + eq_(res[0] >> 5, 0) + eq_(res[0] & 0x1f, 0) + eq_(res[1], 1) + eq_(res[2] >> 7, 0) + eq_(res[2] & 0x07, 4) + eq_(res[3], 70) + eq_(res[4], 0) + eq_(res[5], 1) + eq_(res[6], 4) + + +class Test_loopback_message(unittest.TestCase): + + def setUp(self): + self.md_lv = 1 + self.version = 1 + self.opcode = cfm.CFM_LOOPBACK_MESSAGE + self.flags = 0 + self.first_tlv_offset = cfm.loopback_message._TLV_OFFSET + self.transaction_id = 12345 + self.tlvs = [ + ] + + self.end_tlv = 0 + self.ins = cfm.loopback_message( + self.md_lv, + self.version, + self.transaction_id, + self.tlvs, + ) + self.form = '!4BIB' + self.buf = struct.pack( + self.form, + (self.md_lv << 5) | self.version, + self.opcode, + self.flags, + self.first_tlv_offset, + self.transaction_id, + self.end_tlv + ) + + def tearDown(self): + pass + + def test_init(self): + eq_(self.md_lv, self.ins.md_lv) + eq_(self.version, self.ins.version) + eq_(self.transaction_id, self.ins.transaction_id) + eq_(self.tlvs, self.ins.tlvs) + + def test_parser(self): + _res = cfm.loopback_message.parser(self.buf) + if type(_res) is tuple: + res = _res[0] + else: + res = _res + eq_(self.md_lv, res.md_lv) + eq_(self.version, res.version) + eq_(self.transaction_id, res.transaction_id) + eq_(self.tlvs, res.tlvs) + + def test_serialize(self): + buf = self.ins.serialize() + res = struct.unpack_from(self.form, str(buf)) + eq_(self.md_lv, res[0] >> 5) + eq_(self.version, res[0] & 0x1f) + eq_(self.opcode, res[1]) + eq_(self.flags, res[2]) + eq_(self.first_tlv_offset, res[3]) + eq_(self.transaction_id, res[4]) + eq_(self.end_tlv, res[5]) + + def test_len(self): + # 9 octet (If tlv does not exist) + eq_(9, len(self.ins)) + + def test_default_args(self): + ins = cfm.loopback_message() + buf = ins.serialize() + res = struct.unpack_from(cfm.loopback_message._PACK_STR, + str(buf)) + eq_(res[0] >> 5, 0) + eq_(res[0] & 0x1f, 0) + eq_(res[1], 3) + eq_(res[2], 0) + eq_(res[3], 4) + eq_(res[4], 0) + + +class Test_loopback_reply(unittest.TestCase): + + def setUp(self): + self.md_lv = 1 + self.version = 1 + self.opcode = cfm.CFM_LOOPBACK_REPLY + self.flags = 0 + self.first_tlv_offset = cfm.loopback_reply._TLV_OFFSET + self.transaction_id = 12345 + self.tlvs = [ + ] + self.end_tlv = 0 + self.ins = cfm.loopback_reply( + self.md_lv, + self.version, + self.transaction_id, + self.tlvs, + ) + self.form = '!4BIB' + self.buf = struct.pack( + self.form, + (self.md_lv << 5) | self.version, + self.opcode, + self.flags, + self.first_tlv_offset, + self.transaction_id, + self.end_tlv + ) + + def tearDown(self): + pass + + def test_init(self): + eq_(self.md_lv, self.ins.md_lv) + eq_(self.version, self.ins.version) + eq_(self.transaction_id, self.ins.transaction_id) + eq_(self.tlvs, self.ins.tlvs) + + def test_parser(self): + _res = cfm.loopback_reply.parser(self.buf) + if type(_res) is tuple: + res = _res[0] + else: + res = _res + eq_(self.md_lv, res.md_lv) + eq_(self.version, res.version) + eq_(self.transaction_id, res.transaction_id) + eq_(self.tlvs, res.tlvs) + + def test_serialize(self): + buf = self.ins.serialize() + res = struct.unpack_from(self.form, str(buf)) + eq_(self.md_lv, res[0] >> 5) + eq_(self.version, res[0] & 0x1f) + eq_(self.opcode, res[1]) + eq_(self.flags, res[2]) + eq_(self.first_tlv_offset, res[3]) + eq_(self.transaction_id, res[4]) + eq_(self.end_tlv, res[5]) + + def test_len(self): + # 9 octet (If tlv does not exist) + eq_(9, len(self.ins)) + + def test_default_args(self): + ins = cfm.loopback_reply() + buf = ins.serialize() + res = struct.unpack_from(cfm.loopback_reply._PACK_STR, str(buf)) + eq_(res[0] >> 5, 0) + eq_(res[0] & 0x1f, 0) + eq_(res[1], 2) + eq_(res[2], 0) + eq_(res[3], 4) + eq_(res[4], 0) + + +class Test_link_trace_message(unittest.TestCase): + + def setUp(self): + self.md_lv = 1 + self.version = 1 + self.opcode = cfm.CFM_LINK_TRACE_MESSAGE + self.use_fdb_only = 1 + self.first_tlv_offset = cfm.link_trace_message._TLV_OFFSET + self.transaction_id = 12345 + self.ttl = 55 + self.ltm_orig_addr = "00:11:22:44:55:66" + self.ltm_targ_addr = "ab:cd:ef:23:12:65" + self.tlvs = [ + ] + + self.end_tlv = 0 + self.ins = cfm.link_trace_message( + self.md_lv, + self.version, + self.use_fdb_only, + self.transaction_id, + self.ttl, + self.ltm_orig_addr, + self.ltm_targ_addr, + self.tlvs + ) + self.form = '!4BIB6s6sB' + self.buf = struct.pack( + self.form, + (self.md_lv << 5) | self.version, + self.opcode, + self.use_fdb_only << 7, + self.first_tlv_offset, + self.transaction_id, + self.ttl, + addrconv.mac.text_to_bin(self.ltm_orig_addr), + addrconv.mac.text_to_bin(self.ltm_targ_addr), + self.end_tlv + ) + + def tearDown(self): + pass + + def test_init(self): + eq_(self.md_lv, self.ins.md_lv) + eq_(self.version, self.ins.version) + eq_(self.use_fdb_only, self.ins.use_fdb_only) + eq_(self.transaction_id, self.ins.transaction_id) + eq_(self.ttl, self.ins.ttl) + eq_(self.ltm_orig_addr, self.ins.ltm_orig_addr) + eq_(self.ltm_targ_addr, self.ins.ltm_targ_addr) + eq_(self.tlvs, self.ins.tlvs) + + def test_parser(self): + _res = cfm.link_trace_message.parser(self.buf) + if type(_res) is tuple: + res = _res[0] + else: + res = _res + eq_(self.md_lv, res.md_lv) + eq_(self.version, res.version) + eq_(self.use_fdb_only, res.use_fdb_only) + eq_(self.transaction_id, res.transaction_id) + eq_(self.ttl, res.ttl) + eq_(self.ltm_orig_addr, res.ltm_orig_addr) + eq_(self.ltm_targ_addr, res.ltm_targ_addr) + eq_(self.tlvs, res.tlvs) + + def test_serialize(self): + buf = self.ins.serialize() + res = struct.unpack_from(self.form, str(buf)) + eq_(self.md_lv, res[0] >> 5) + eq_(self.version, res[0] & 0x1f) + eq_(self.opcode, res[1]) + eq_(self.use_fdb_only, res[2] >> 7) + eq_(self.first_tlv_offset, res[3]) + eq_(self.transaction_id, res[4]) + eq_(self.ttl, res[5]) + eq_(addrconv.mac.text_to_bin(self.ltm_orig_addr), res[6]) + eq_(addrconv.mac.text_to_bin(self.ltm_targ_addr), res[7]) + eq_(self.end_tlv, res[8]) + + def test_len(self): + # 22 octet (If tlv does not exist) + eq_(22, len(self.ins)) + + def test_default_args(self): + ins = cfm.link_trace_message() + buf = ins.serialize() + res = struct.unpack_from(cfm.link_trace_message._PACK_STR, str(buf)) + eq_(res[0] >> 5, 0) + eq_(res[0] & 0x1f, 0) + eq_(res[1], 5) + eq_(res[2] >> 7, 1) + eq_(res[3], 17) + eq_(res[4], 0) + eq_(res[5], 64) + eq_(res[6], addrconv.mac.text_to_bin('00:00:00:00:00:00')) + eq_(res[7], addrconv.mac.text_to_bin('00:00:00:00:00:00')) + + +class Test_link_trace_reply(unittest.TestCase): + + def setUp(self): + self.md_lv = 1 + self.version = 1 + self.opcode = cfm.CFM_LINK_TRACE_REPLY + self.use_fdb_only = 1 + self.fwd_yes = 0 + self.terminal_mep = 1 + self.first_tlv_offset = cfm.link_trace_reply._TLV_OFFSET + self.transaction_id = 12345 + self.ttl = 55 + self.relay_action = 2 + self.ltm_orig_addr = "00:11:22:aa:bb:cc" + self.ltm_targ_addr = "53:45:24:64:ac:ff" + self.tlvs = [ + ] + self.end_tlv = 0 + self.ins = cfm.link_trace_reply( + self.md_lv, + self.version, + self.use_fdb_only, + self.fwd_yes, + self.terminal_mep, + self.transaction_id, + self.ttl, + self.relay_action, + self.tlvs, + ) + self.form = '!4BIBBB' + self.buf = struct.pack( + self.form, + (self.md_lv << 5) | self.version, + self.opcode, + (self.use_fdb_only << 7) | (self.fwd_yes << 6) | + (self.terminal_mep << 5), + self.first_tlv_offset, + self.transaction_id, + self.ttl, + self.relay_action, + self.end_tlv + ) + + def tearDown(self): + pass + + def test_init(self): + eq_(self.md_lv, self.ins.md_lv) + eq_(self.version, self.ins.version) + eq_(self.use_fdb_only, self.ins.use_fdb_only) + eq_(self.fwd_yes, self.ins.fwd_yes) + eq_(self.terminal_mep, self.ins.terminal_mep) + eq_(self.transaction_id, self.ins.transaction_id) + eq_(self.ttl, self.ins.ttl) + eq_(self.relay_action, self.ins.relay_action) + eq_(self.tlvs, self.ins.tlvs) + + def test_parser(self): + _res = cfm.link_trace_reply.parser(self.buf) + if type(_res) is tuple: + res = _res[0] + else: + res = _res + eq_(self.md_lv, res.md_lv) + eq_(self.version, res.version) + eq_(self.use_fdb_only, self.ins.use_fdb_only) + eq_(self.fwd_yes, self.ins.fwd_yes) + eq_(self.terminal_mep, self.ins.terminal_mep) + eq_(self.transaction_id, res.transaction_id) + eq_(self.ttl, res.ttl) + eq_(self.relay_action, res.relay_action) + eq_(self.tlvs, res.tlvs) + + def test_serialize(self): + buf = self.ins.serialize() + res = struct.unpack_from(self.form, str(buf)) + eq_(self.md_lv, res[0] >> 5) + eq_(self.version, res[0] & 0x1f) + eq_(self.opcode, res[1]) + eq_(self.use_fdb_only, res[2] >> 7 & 0x01) + eq_(self.fwd_yes, res[2] >> 6 & 0x01) + eq_(self.terminal_mep, res[2] >> 5 & 0x01) + eq_(self.first_tlv_offset, res[3]) + eq_(self.transaction_id, res[4]) + eq_(self.ttl, res[5]) + eq_(self.relay_action, res[6]) + eq_(self.end_tlv, res[7]) + + def test_len(self): + # 11 octet (If tlv does not exist) + eq_(11, len(self.ins)) + + def test_default_args(self): + ins = cfm.link_trace_reply() + buf = ins.serialize() + res = struct.unpack_from(cfm.link_trace_reply._PACK_STR, str(buf)) + eq_(res[0] >> 5, 0) + eq_(res[0] & 0x1f, 0) + eq_(res[1], 4) + eq_(res[2] >> 7, 1) + eq_(res[2] >> 6 & 0x01, 0) + eq_(res[2] >> 5 & 0x01, 1) + eq_(res[3], 6) + eq_(res[4], 0) + eq_(res[5], 64) + eq_(res[6], 1) + + +class Test_sender_id_tlv(unittest.TestCase): + + def setUp(self): + self._type = cfm.CFM_SENDER_ID_TLV + self.length = 10 + self.chassis_id_length = 1 + self.chassis_id_subtype = 3 + self.chassis_id = "\x0a" + self.ma_domain_length = 2 + self.ma_domain = "\x04\x05" + self.ma_length = 3 + self.ma = "\x01\x02\x03" + self.ins = cfm.sender_id_tlv( + self.length, + self.chassis_id_length, + self.chassis_id_subtype, + self.chassis_id, + self.ma_domain_length, + self.ma_domain, + self.ma_length, + self.ma, + ) + self.form = '!BHBB1sB2sB3s' + self.buf = struct.pack( + self.form, + self._type, + self.length, + self.chassis_id_length, + self.chassis_id_subtype, + self.chassis_id, + self.ma_domain_length, + self.ma_domain, + self.ma_length, + self.ma + ) + + def tearDown(self): + pass + + def test_init(self): + eq_(self.length, self.ins.length) + eq_(self.chassis_id_length, self.ins.chassis_id_length) + eq_(self.chassis_id_subtype, self.ins.chassis_id_subtype) + eq_(self.chassis_id, self.ins.chassis_id) + eq_(self.ma_domain_length, self.ins.ma_domain_length) + eq_(self.ma_domain, self.ins.ma_domain) + eq_(self.ma_length, self.ins.ma_length) + eq_(self.ma, self.ins.ma) + + def test_parser(self): + _res = cfm.sender_id_tlv.parser(self.buf) + if type(_res) is tuple: + res = _res[0] + else: + res = _res + eq_(self.length, res.length) + eq_(self.chassis_id_length, res.chassis_id_length) + eq_(self.chassis_id_subtype, res.chassis_id_subtype) + eq_(self.chassis_id, res.chassis_id) + eq_(self.ma_domain_length, res.ma_domain_length) + eq_(self.ma_domain, res.ma_domain) + eq_(self.ma_length, res.ma_length) + eq_(self.ma, res.ma) + + def test_serialize(self): + buf = self.ins.serialize() + res = struct.unpack_from(self.form, str(buf)) + eq_(self._type, res[0]) + eq_(self.length, res[1]) + eq_(self.chassis_id_length, res[2]) + eq_(self.chassis_id_subtype, res[3]) + eq_(self.chassis_id, res[4]) + eq_(self.ma_domain_length, res[5]) + eq_(self.ma_domain, res[6]) + eq_(self.ma_length, res[7]) + eq_(self.ma, res[8]) + + def test_serialize_semi_normal_ptn1(self): + ins = cfm.sender_id_tlv( + chassis_id_subtype=self.chassis_id_subtype, + chassis_id=self.chassis_id, + ma_domain=self.ma_domain, + ) + buf = ins.serialize() + form = '!BHBB1sB2sB' + res = struct.unpack_from(form, str(buf)) + eq_(self._type, res[0]) + eq_(7, res[1]) + eq_(self.chassis_id_length, res[2]) + eq_(self.chassis_id_subtype, res[3]) + eq_(self.chassis_id, res[4]) + eq_(self.ma_domain_length, res[5]) + eq_(self.ma_domain, res[6]) + eq_(0, res[7]) + + def test_serialize_semi_normal_ptn2(self): + ins = cfm.sender_id_tlv( + ma_domain=self.ma_domain, + ma=self.ma, + ) + buf = ins.serialize() + form = '!BHBB2sB3s' + res = struct.unpack_from(form, str(buf)) + eq_(self._type, res[0]) + eq_(8, res[1]) + eq_(0, res[2]) + eq_(self.ma_domain_length, res[3]) + eq_(self.ma_domain, res[4]) + eq_(self.ma_length, res[5]) + eq_(self.ma, res[6]) + + def test_serialize_semi_normal_ptn3(self): + ins = cfm.sender_id_tlv( + chassis_id_subtype=self.chassis_id_subtype, + chassis_id=self.chassis_id, + ) + buf = ins.serialize() + form = '!BHBB1sB' + res = struct.unpack_from(form, str(buf)) + eq_(self._type, res[0]) + eq_(4, res[1]) + eq_(self.chassis_id_length, res[2]) + eq_(self.chassis_id_subtype, res[3]) + eq_(self.chassis_id, res[4]) + eq_(0, res[5]) + + def test_serialize_semi_normal_ptn4(self): + ins = cfm.sender_id_tlv( + ma_domain=self.ma_domain, + ) + buf = ins.serialize() + form = '!BHBB2sB' + res = struct.unpack_from(form, str(buf)) + eq_(self._type, res[0]) + eq_(5, res[1]) + eq_(0, res[2]) + eq_(self.ma_domain_length, res[3]) + eq_(self.ma_domain, res[4]) + eq_(0, res[5]) + + def test_serialize_with_length_zero(self): + ins = cfm.sender_id_tlv( + 0, + 0, + self.chassis_id_subtype, + self.chassis_id, + 0, + self.ma_domain, + 0, + self.ma, + ) + buf = ins.serialize() + res = struct.unpack_from(self.form, str(buf)) + eq_(self._type, res[0]) + eq_(self.length, res[1]) + eq_(self.chassis_id_length, res[2]) + eq_(self.chassis_id_subtype, res[3]) + eq_(self.chassis_id, res[4]) + eq_(self.ma_domain_length, res[5]) + eq_(self.ma_domain, res[6]) + eq_(self.ma_length, res[7]) + eq_(self.ma, res[8]) + + def test_len(self): + # tlv_length = type_len + length_len + value_len + eq_(1 + 2 + 10, len(self.ins)) + + def test_default_args(self): + ins = cfm.sender_id_tlv() + buf = ins.serialize() + res = struct.unpack_from(cfm.sender_id_tlv._PACK_STR, str(buf)) + eq_(res[0], cfm.CFM_SENDER_ID_TLV) + eq_(res[1], 1) + eq_(res[2], 0) + + +class Test_port_status_tlv(unittest.TestCase): + + def setUp(self): + self._type = cfm.CFM_PORT_STATUS_TLV + self.length = 1 + self.port_status = 1 + self.ins = cfm.port_status_tlv( + self.length, + self.port_status + ) + self.form = '!BHB' + self.buf = struct.pack( + self.form, + self._type, + self.length, + self.port_status + ) + + def tearDown(self): + pass + + def test_init(self): + eq_(self.length, self.ins.length) + eq_(self.port_status, self.ins.port_status) + + def test_parser(self): + _res = cfm.port_status_tlv.parser(self.buf) + if type(_res) is tuple: + res = _res[0] + else: + res = _res + eq_(self.length, res.length) + eq_(self.port_status, res.port_status) + + def test_serialize(self): + buf = self.ins.serialize() + res = struct.unpack_from(self.form, str(buf)) + eq_(self._type, res[0]) + eq_(self.length, res[1]) + eq_(self.port_status, res[2]) + + def test_len(self): + # tlv_length = type_len + length_len + value_len + eq_(1 + 2 + 1, len(self.ins)) + + def test_default_args(self): + ins = cfm.port_status_tlv() + buf = ins.serialize() + res = struct.unpack_from(cfm.port_status_tlv._PACK_STR, str(buf)) + eq_(res[0], cfm.CFM_PORT_STATUS_TLV) + eq_(res[1], 1) + eq_(res[2], 2) + + +class Test_data_tlv(unittest.TestCase): + + def setUp(self): + self._type = cfm.CFM_DATA_TLV + self.length = 3 + self.data_value = "\x01\x02\x03" + self.ins = cfm.data_tlv( + self.length, + self.data_value + ) + self.form = '!BH3s' + self.buf = struct.pack( + self.form, + self._type, + self.length, + self.data_value, + ) + + def tearDown(self): + pass + + def test_init(self): + eq_(self.length, self.ins.length) + eq_(self.data_value, self.ins.data_value) + + def test_parser(self): + _res = cfm.data_tlv.parser(self.buf) + if type(_res) is tuple: + res = _res[0] + else: + res = _res + eq_(self.length, res.length) + eq_(self.data_value, res.data_value) + + def test_serialize(self): + buf = self.ins.serialize() + res = struct.unpack_from(self.form, str(buf)) + eq_(self._type, res[0]) + eq_(self.length, res[1]) + eq_(self.data_value, res[2]) + + def test_serialize_with_length_zero(self): + ins = cfm.data_tlv( + 0, + self.data_value + ) + buf = ins.serialize() + res = struct.unpack_from(self.form, str(buf)) + eq_(self._type, res[0]) + eq_(self.length, res[1]) + eq_(self.data_value, res[2]) + + def test_len(self): + # tlv_length = type_len + length_len + value_len + eq_(1 + 2 + 3, len(self.ins)) + + def test_default_args(self): + ins = cfm.data_tlv() + buf = ins.serialize() + res = struct.unpack_from(cfm.data_tlv._PACK_STR, str(buf)) + eq_(res[0], cfm.CFM_DATA_TLV) + eq_(res[1], 0) + + +class Test_interface_status_tlv(unittest.TestCase): + + def setUp(self): + self._type = cfm.CFM_INTERFACE_STATUS_TLV + self.length = 1 + self.interface_status = 4 + self.ins = cfm.interface_status_tlv( + self.length, + self.interface_status + ) + self.form = '!BHB' + self.buf = struct.pack( + self.form, + self._type, + self.length, + self.interface_status + ) + + def tearDown(self): + pass + + def test_init(self): + eq_(self.length, self.ins.length) + eq_(self.interface_status, self.ins.interface_status) + + def test_parser(self): + _res = cfm.interface_status_tlv.parser(self.buf) + if type(_res) is tuple: + res = _res[0] + else: + res = _res + eq_(self.length, res.length) + eq_(self.interface_status, res.interface_status) + + def test_serialize(self): + buf = self.ins.serialize() + res = struct.unpack_from(self.form, str(buf)) + eq_(self._type, res[0]) + eq_(self.length, res[1]) + eq_(self.interface_status, res[2]) + + def test_len(self): + # tlv_length = type_len + length_len + value_len + eq_(1 + 2 + 1, len(self.ins)) + + def test_default_args(self): + ins = cfm.interface_status_tlv() + buf = ins.serialize() + res = struct.unpack_from(cfm.interface_status_tlv._PACK_STR, str(buf)) + eq_(res[0], cfm.CFM_INTERFACE_STATUS_TLV) + eq_(res[1], 1) + eq_(res[2], 1) + + +class Test_ltm_egress_identifier_tlv(unittest.TestCase): + + def setUp(self): + self._type = cfm.CFM_LTM_EGRESS_IDENTIFIER_TLV + self.length = 8 + self.egress_id_ui = 7 + self.egress_id_mac = "11:22:33:44:55:66" + self.ins = cfm.ltm_egress_identifier_tlv( + self.length, + self.egress_id_ui, + self.egress_id_mac + ) + self.form = '!BHH6s' + self.buf = struct.pack( + self.form, + self._type, + self.length, + self.egress_id_ui, + addrconv.mac.text_to_bin(self.egress_id_mac) + ) + + def tearDown(self): + pass + + def test_init(self): + eq_(self.length, self.ins.length) + eq_(self.egress_id_ui, self.ins.egress_id_ui) + eq_(self.egress_id_mac, self.ins.egress_id_mac) + + def test_parser(self): + _res = cfm.ltm_egress_identifier_tlv.parser(self.buf) + if type(_res) is tuple: + res = _res[0] + else: + res = _res + eq_(self.length, res.length) + eq_(self.egress_id_ui, res.egress_id_ui) + eq_(self.egress_id_mac, res.egress_id_mac) + + def test_serialize(self): + buf = self.ins.serialize() + res = struct.unpack_from(self.form, str(buf)) + eq_(self._type, res[0]) + eq_(self.length, res[1]) + eq_(self.egress_id_ui, res[2]) + eq_(addrconv.mac.text_to_bin(self.egress_id_mac), res[3]) + + def test_serialize_with_length_zero(self): + ins = cfm.ltm_egress_identifier_tlv( + 0, + self.egress_id_ui, + self.egress_id_mac + ) + buf = ins.serialize() + res = struct.unpack_from(self.form, str(buf)) + eq_(self._type, res[0]) + eq_(self.length, res[1]) + eq_(self.egress_id_ui, res[2]) + eq_(addrconv.mac.text_to_bin(self.egress_id_mac), res[3]) + + def test_len(self): + # tlv_length = type_len + length_len + value_len + eq_(1 + 2 + 8, len(self.ins)) + + def test_default_args(self): + ins = cfm.ltm_egress_identifier_tlv() + buf = ins.serialize() + res = struct.unpack_from( + cfm.ltm_egress_identifier_tlv._PACK_STR, str(buf)) + eq_(res[0], cfm.CFM_LTM_EGRESS_IDENTIFIER_TLV) + eq_(res[1], 8) + eq_(res[2], 0) + eq_(res[3], addrconv.mac.text_to_bin('00:00:00:00:00:00')) + + +class Test_ltr_egress_identifier_tlv(unittest.TestCase): + + def setUp(self): + self._type = cfm.CFM_LTR_EGRESS_IDENTIFIER_TLV + self.length = 16 + self.last_egress_id_ui = 7 + self.last_egress_id_mac = "11:22:33:44:55:66" + self.next_egress_id_ui = 5 + self.next_egress_id_mac = "33:11:33:aa:bb:cc" + self.ins = cfm.ltr_egress_identifier_tlv(self.length, + self.last_egress_id_ui, + self.last_egress_id_mac, + self.next_egress_id_ui, + self.next_egress_id_mac + ) + self.form = '!BHH6sH6s' + self.buf = struct.pack( + self.form, + self._type, + self.length, + self.last_egress_id_ui, + addrconv.mac.text_to_bin(self.last_egress_id_mac), + self.next_egress_id_ui, + addrconv.mac.text_to_bin(self.next_egress_id_mac)) + + def tearDown(self): + pass + + def test_init(self): + eq_(self.length, self.ins.length) + eq_(self.last_egress_id_ui, self.ins.last_egress_id_ui) + eq_(self.last_egress_id_mac, self.ins.last_egress_id_mac) + eq_(self.next_egress_id_ui, self.ins.next_egress_id_ui) + eq_(self.next_egress_id_mac, self.ins.next_egress_id_mac) + + def test_parser(self): + _res = cfm.ltr_egress_identifier_tlv.parser(self.buf) + if type(_res) is tuple: + res = _res[0] + else: + res = _res + eq_(self.length, res.length) + eq_(self.last_egress_id_ui, res.last_egress_id_ui) + eq_(self.last_egress_id_mac, res.last_egress_id_mac) + eq_(self.next_egress_id_ui, res.next_egress_id_ui) + eq_(self.next_egress_id_mac, res.next_egress_id_mac) + + def test_serialize(self): + buf = self.ins.serialize() + res = struct.unpack_from(self.form, str(buf)) + eq_(self._type, res[0]) + eq_(self.length, res[1]) + eq_(self.last_egress_id_ui, res[2]) + eq_(addrconv.mac.text_to_bin(self.last_egress_id_mac), res[3]) + eq_(self.next_egress_id_ui, res[4]) + eq_(addrconv.mac.text_to_bin(self.next_egress_id_mac), res[5]) + + def test_serialize_with_length_zero(self): + ins = cfm.ltr_egress_identifier_tlv(0, + self.last_egress_id_ui, + self.last_egress_id_mac, + self.next_egress_id_ui, + self.next_egress_id_mac + ) + buf = ins.serialize() + res = struct.unpack_from(self.form, str(buf)) + eq_(self._type, res[0]) + eq_(self.length, res[1]) + eq_(self.last_egress_id_ui, res[2]) + eq_(addrconv.mac.text_to_bin(self.last_egress_id_mac), res[3]) + eq_(self.next_egress_id_ui, res[4]) + eq_(addrconv.mac.text_to_bin(self.next_egress_id_mac), res[5]) + + def test_len(self): + # tlv_length = type_len + length_len + value_len + eq_(1 + 2 + 16, len(self.ins)) + + def test_default_args(self): + ins = cfm.ltr_egress_identifier_tlv() + buf = ins.serialize() + res = struct.unpack_from(cfm.ltr_egress_identifier_tlv._PACK_STR, + str(buf)) + eq_(res[0], cfm.CFM_LTR_EGRESS_IDENTIFIER_TLV) + eq_(res[1], 16) + eq_(res[2], 0) + eq_(res[3], addrconv.mac.text_to_bin('00:00:00:00:00:00')) + eq_(res[4], 0) + eq_(res[5], addrconv.mac.text_to_bin('00:00:00:00:00:00')) + + +class Test_organization_specific_tlv(unittest.TestCase): + + def setUp(self): + self._type = cfm.CFM_ORGANIZATION_SPECIFIC_TLV + self.length = 10 + self.oui = "\xff\x12\x34" + self.subtype = 3 + self.value = "\x01\x02\x0f\x0e\x0d\x0c" + self.ins = cfm.organization_specific_tlv(self.length, + self.oui, + self.subtype, + self.value + ) + self.form = '!BH3sB6s' + self.buf = struct.pack(self.form, + self._type, + self.length, + self.oui, + self.subtype, + self.value + ) + + def tearDown(self): + pass + + def test_init(self): + eq_(self.length, self.ins.length) + eq_(self.oui, self.ins.oui) + eq_(self.subtype, self.ins.subtype) + eq_(self.value, self.ins.value) + + def test_parser(self): + _res = cfm.organization_specific_tlv.parser(self.buf) + if type(_res) is tuple: + res = _res[0] + else: + res = _res + eq_(self.length, res.length) + eq_(self.oui, res.oui) + eq_(self.subtype, res.subtype) + eq_(self.value, res.value) + + def test_serialize(self): + buf = self.ins.serialize() + res = struct.unpack_from(self.form, str(buf)) + eq_(self._type, res[0]) + eq_(self.length, res[1]) + eq_(self.oui, res[2]) + eq_(self.subtype, res[3]) + eq_(self.value, res[4]) + + def test_serialize_with_zero(self): + ins = cfm.organization_specific_tlv(0, + self.oui, + self.subtype, + self.value + ) + buf = ins.serialize() + res = struct.unpack_from(self.form, str(buf)) + eq_(self._type, res[0]) + eq_(self.length, res[1]) + eq_(self.oui, res[2]) + eq_(self.subtype, res[3]) + eq_(self.value, res[4]) + + def test_len(self): + # tlv_length = type_len + length_len + value_len + eq_(1 + 2 + 10, len(self.ins)) + + def test_default_args(self): + ins = cfm.organization_specific_tlv() + buf = ins.serialize() + res = struct.unpack_from(cfm.organization_specific_tlv._PACK_STR, + str(buf)) + eq_(res[0], cfm.CFM_ORGANIZATION_SPECIFIC_TLV) + eq_(res[1], 4) + eq_(res[2], "\x00\x00\x00") + eq_(res[3], 0) + + +class Test_reply_ingress_tlv(unittest.TestCase): + + def setUp(self): + self._type = cfm.CFM_REPLY_INGRESS_TLV + self.length = 12 + self.action = 2 + self.mac_address = 'aa:bb:cc:56:34:12' + self.port_id_length = 3 + self.port_id_subtype = 2 + self.port_id = "\x01\x04\x09" + self.ins = cfm.reply_ingress_tlv(self.length, self.action, + self.mac_address, + self.port_id_length, + self.port_id_subtype, + self.port_id + ) + self.form = '!BHB6sBB3s' + self.buf = struct.pack(self.form, + self._type, + self.length, + self.action, + addrconv.mac.text_to_bin(self.mac_address), + self.port_id_length, + self.port_id_subtype, + self.port_id + ) + + def tearDown(self): + pass + + def test_init(self): + eq_(self.length, self.ins.length) + eq_(self.action, self.ins.action) + eq_(self.mac_address, self.ins.mac_address) + eq_(self.port_id_length, self.ins.port_id_length) + eq_(self.port_id_subtype, self.ins.port_id_subtype) + eq_(self.port_id, self.ins.port_id) + + def test_parser(self): + _res = cfm.reply_ingress_tlv.parser(self.buf) + if type(_res) is tuple: + res = _res[0] + else: + res = _res + eq_(self.length, res.length) + eq_(self.action, res.action) + eq_(self.mac_address, res.mac_address) + eq_(self.port_id_length, res.port_id_length) + eq_(self.port_id_subtype, res.port_id_subtype) + eq_(self.port_id, res.port_id) + + def test_serialize(self): + buf = self.ins.serialize() + res = struct.unpack_from(self.form, str(buf)) + eq_(self._type, res[0]) + eq_(self.length, res[1]) + eq_(self.action, res[2]) + eq_(addrconv.mac.text_to_bin(self.mac_address), res[3]) + eq_(self.port_id_length, res[4]) + eq_(self.port_id_subtype, res[5]) + eq_(self.port_id, res[6]) + + def test_serialize_with_zero(self): + ins = cfm.reply_ingress_tlv(0, + self.action, + self.mac_address, + 0, + self.port_id_subtype, + self.port_id + ) + buf = ins.serialize() + res = struct.unpack_from(self.form, str(buf)) + eq_(self._type, res[0]) + eq_(self.length, res[1]) + eq_(self.action, res[2]) + eq_(addrconv.mac.text_to_bin(self.mac_address), res[3]) + eq_(self.port_id_length, res[4]) + eq_(self.port_id_subtype, res[5]) + eq_(self.port_id, res[6]) + + def test_len(self): + # tlv_length = type_len + length_len + value_len + eq_(1 + 2 + 12, len(self.ins)) + + def test_default_args(self): + ins = cfm.reply_ingress_tlv() + buf = ins.serialize() + res = struct.unpack_from(cfm.reply_ingress_tlv._PACK_STR, str(buf)) + eq_(res[0], cfm.CFM_REPLY_INGRESS_TLV) + eq_(res[1], 7) + eq_(res[2], 1) + eq_(res[3], addrconv.mac.text_to_bin('00:00:00:00:00:00')) + + +class Test_reply_egress_tlv(unittest.TestCase): + + def setUp(self): + self._type = cfm.CFM_REPLY_EGRESS_TLV + self.length = 12 + self.action = 2 + self.mac_address = 'aa:bb:cc:56:34:12' + self.port_id_length = 3 + self.port_id_subtype = 2 + self.port_id = "\x01\x04\x09" + self.ins = cfm.reply_egress_tlv(self.length, + self.action, + self.mac_address, + self.port_id_length, + self.port_id_subtype, + self.port_id + ) + self.form = '!BHB6sBB3s' + self.buf = struct.pack(self.form, + self._type, + self.length, + self.action, + addrconv.mac.text_to_bin(self.mac_address), + self.port_id_length, + self.port_id_subtype, + self.port_id + ) + + def tearDown(self): + pass + + def test_init(self): + eq_(self.length, self.ins.length) + eq_(self.action, self.ins.action) + eq_(self.mac_address, self.ins.mac_address) + eq_(self.port_id_length, self.ins.port_id_length) + eq_(self.port_id_subtype, self.ins.port_id_subtype) + eq_(self.port_id, self.ins.port_id) + + def test_parser(self): + _res = cfm.reply_ingress_tlv.parser(self.buf) + if type(_res) is tuple: + res = _res[0] + else: + res = _res + eq_(self.length, res.length) + eq_(self.action, res.action) + eq_(self.mac_address, res.mac_address) + eq_(self.port_id_length, res.port_id_length) + eq_(self.port_id_subtype, res.port_id_subtype) + eq_(self.port_id, res.port_id) + + def test_serialize(self): + buf = self.ins.serialize() + res = struct.unpack_from(self.form, str(buf)) + eq_(self._type, res[0]) + eq_(self.length, res[1]) + eq_(self.action, res[2]) + eq_(addrconv.mac.text_to_bin(self.mac_address), res[3]) + eq_(self.port_id_length, res[4]) + eq_(self.port_id_subtype, res[5]) + eq_(self.port_id, res[6]) + + def test_serialize_with_zero(self): + ins = cfm.reply_egress_tlv(0, + self.action, + self.mac_address, + 0, + self.port_id_subtype, + self.port_id + ) + buf = ins.serialize() + res = struct.unpack_from(self.form, str(buf)) + eq_(self._type, res[0]) + eq_(self.length, res[1]) + eq_(self.action, res[2]) + eq_(addrconv.mac.text_to_bin(self.mac_address), res[3]) + eq_(self.port_id_length, res[4]) + eq_(self.port_id_subtype, res[5]) + eq_(self.port_id, res[6]) + + def test_len(self): + # tlv_length = type_len + length_len + value_len + eq_(1 + 2 + 12, len(self.ins)) + + def test_default_args(self): + ins = cfm.reply_egress_tlv() + buf = ins.serialize() + res = struct.unpack_from(cfm.reply_egress_tlv._PACK_STR, str(buf)) + eq_(res[0], cfm.CFM_REPLY_EGRESS_TLV) + eq_(res[1], 7) + eq_(res[2], 1) + eq_(res[3], addrconv.mac.text_to_bin('00:00:00:00:00:00')) |