diff options
author | watanabe.fumitaka <watanabe.fumitaka@nttcom.co.jp> | 2013-02-25 14:30:27 +0900 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2013-02-25 14:33:19 +0900 |
commit | e3544031e019c617dcadb822372df02d3dd99d88 (patch) | |
tree | 55ff3fdad46718d9961c67444565cc242a3d4c6c | |
parent | 2a77267bd32a40a4c87813cbdb232893fadc1fb9 (diff) |
xflow: add sflow support
This patch implements sflow packet parser.
I confirmed that it can parse a sflow packet from Open vSwitch.
Signed-off-by: WATANABE Fumitaka <watanabe.fumitaka@nttcom.co.jp>
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r-- | ryu/lib/xflow/sflow.py | 436 |
1 files changed, 436 insertions, 0 deletions
diff --git a/ryu/lib/xflow/sflow.py b/ryu/lib/xflow/sflow.py new file mode 100644 index 00000000..312e6ee7 --- /dev/null +++ b/ryu/lib/xflow/sflow.py @@ -0,0 +1,436 @@ +# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import struct +import logging + +SFLOW_V2 = 0x00000002 +SFLOW_V3 = 0x00000003 +SFLOW_V4 = 0x00000004 +SFLOW_V5 = 0x00000005 + +LOG = logging.getLogger('ryu.lib.xflow.sflow') + + +class sFlow(object): + _PACK_STR = '!i' + _SFLOW_VERSIONS = {} + + @staticmethod + def register_sflow_version(version): + def _register_sflow_version(cls): + sFlow._SFLOW_VERSIONS[version] = cls + return cls + return _register_sflow_version + + def __init__(self): + super(sFlow, self).__init__() + + @classmethod + def parser(cls, buf): + (version,) = struct.unpack_from(cls._PACK_STR, buf) + + cls_ = cls._SFLOW_VERSIONS.get(version, None) + if cls_: + return cls_.parser(buf) + else: + return None + + +@sFlow.register_sflow_version(SFLOW_V5) +class sFlowV5(object): + _PACK_STR = '!ii' + _PACK_STR_IPV4 = '!iiIIIII' + _PACK_STR_IPV6 = '!ii4IIIII' + _AGENT_IPTYPE_V4 = 1 + _AGENT_IPTYPE_V6 = 2 + _MIN_LEN_V4 = struct.calcsize(_PACK_STR_IPV4) + _MIN_LEN_V6 = struct.calcsize(_PACK_STR_IPV6) + + def __init__(self, version, address_type, agent_address, sub_agent_id, + sequence_number, uptime, samples_num, samples): + super(sFlowV5, self).__init__() + self.version = version + self.address_type = address_type + self.agent_address = agent_address + self.sub_agent_id = sub_agent_id + self.sequence_number = sequence_number + self.uptime = uptime + self.samples_num = samples_num + self.samples = samples + + @classmethod + def parser(cls, buf): + (version, address_type) = struct.unpack_from(cls._PACK_STR, buf) + + if address_type == cls._AGENT_IPTYPE_V4: + pack_str = cls._PACK_STR_IPV4 + min_len = cls._MIN_LEN_V4 + elif address_type == cls._AGENT_IPTYPE_V6: + pack_str = cls._PACK_STR_IPV6 + min_len = cls._MIN_LEN_V6 + else: + LOG.info("Unknown address_type. sFlowV5.address_type=%d" + % address_type) + return None + + (version, address_type, agent_address, sub_agent_id, sequence_number, + uptime, samples_num) = struct.unpack_from(pack_str, buf) + offset = min_len + + samples = [] + + while len(buf) > offset: + sample = sFlowV5Sample.parser(buf, offset) + offset += sFlowV5Sample.MIN_LEN + sample.sample_length + samples.append(sample) + + msg = cls(version, address_type, agent_address, sub_agent_id, + sequence_number, uptime, samples_num, samples) + + return msg + + +class sFlowV5Sample(object): + _PACK_STR = '!II' + MIN_LEN = struct.calcsize(_PACK_STR) + + def __init__(self, enterprise, sample_format, sample_length, sample): + super(sFlowV5Sample, self).__init__() + self.enterprise = enterprise + self.sample_format = sample_format + self.sample_length = sample_length + self.sample = sample + + @classmethod + def parser(cls, buf, offset): + (sampledata_format, + sample_length) = struct.unpack_from(cls._PACK_STR, buf, offset) + + format_mask = 0xfff + enterprise_shiftbit = 12 + + sample_format = sampledata_format & format_mask + enterprise = sampledata_format >> enterprise_shiftbit + + offset += cls.MIN_LEN + + if sample_format == 1: + # Flow Sample + sample = sFlowV5FlowSample.parser(buf, offset) + elif sample_format == 2: + # Counter Sample + sample = sFlowV5CounterSample.parser(buf, offset) + else: + #TODO: + # sample_format == 3 : Expanded Flow Sample + # sample_format == 4 : Expanded Counter Sample + LOG.info("Unknown format. sFlowV5Sample.sample_format=%d" + % sample_format) + pack_str = '!%sc' % sample_length + sample = struct.unpack_from(pack_str, buf, offset) + + msg = cls(enterprise, sample_format, sample_length, sample) + + return msg + + +class sFlowV5FlowSample(object): + _PACK_STR = '!IIIIIIII' + + def __init__(self, sequence_number, source_id_type, source_id_index, + sampling_rate, sample_pool, drops, input_if, output_if, + flow_records_num, flow_records): + super(sFlowV5FlowSample, self).__init__() + self.sequence_number = sequence_number + self.source_id_type = source_id_type + self.source_id_index = source_id_index + self.sampling_rate = sampling_rate + self.sample_pool = sample_pool + self.drops = drops + self.input_if = input_if + self.output_if = output_if + self.flow_records_num = flow_records_num + self.flow_records = flow_records + + @classmethod + def parser(cls, buf, offset): + (sequence_number, source_id, sampling_rate, + sample_pool, drops, input_if, output_if, + flow_records_num) = struct.unpack_from(cls._PACK_STR, buf, offset) + + index_mask = 0xffffff + type_shiftbit = 24 + + source_id_index = source_id & index_mask + source_id_type = source_id >> type_shiftbit + + offset += struct.calcsize(cls._PACK_STR) + + flow_records = [] + + for i in range(flow_records_num): + flow_record = sFlowV5FlowRecord.parser(buf, offset) + offset += sFlowV5FlowRecord.MIN_LEN + flow_record.flow_data_length + flow_records.append(flow_record) + + msg = cls(sequence_number, source_id_type, source_id_index, + sampling_rate, sample_pool, drops, input_if, output_if, + flow_records_num, flow_records) + + return msg + + +class sFlowV5FlowRecord(object): + _PACK_STR = '!II' + MIN_LEN = struct.calcsize(_PACK_STR) + + def __init__(self, enterprise, flow_data_format, + flow_data_length, flow_data): + super(sFlowV5FlowRecord, self).__init__() + self.enterprise = enterprise + self.flow_data_format = flow_data_format + self.flow_data_length = flow_data_length + self.flow_data = flow_data + + @classmethod + def parser(cls, buf, offset): + (flowdata_format, + flow_data_length) = struct.unpack_from(cls._PACK_STR, buf, offset) + + format_mask = 0xfff + enterprise_shiftbit = 12 + + flow_data_format = flowdata_format & format_mask + enterprise = flowdata_format >> enterprise_shiftbit + + offset += cls.MIN_LEN + + if flow_data_format == 1: + # Raw Packet Header + flow_data = sFlowV5RawPacketHeader.parser(buf, offset) + elif flow_data_format == 1001: + # Extended Switch Data + flow_data = sFlowV5ExtendedSwitchData.parser(buf, offset) + else: + #TODO: + # flow_data_format == 2 : Ethernet Frame Data + # flow_data_format == 3 : IPv4 Data + # flow_data_format == 4 : IPv6 Data + # flow_data_format == 1002 : Extended Router Data + # flow_data_format == 1003 : Extended Gateway Data + # flow_data_format == 1004 : Extended User Data + # flow_data_format == 1005 : Extended Url Data + # flow_data_format == 1006 : Extended MPLS Data + # flow_data_format == 1007 : Extended NAT Data + # flow_data_format == 1008 : Extended MPLS Tunnel + # flow_data_format == 1009 : Extended MPLS VC + # flow_data_format == 1010 : Extended MPLS FEC + # flow_data_format == 1011 : Extended MPLS LVP FEC + # flow_data_format == 1012 : Extended VLAN tunnel + LOG.info("Unknown format. sFlowV5FlowRecord.flow_data_format=%d" + % flow_data_format) + pack_str = '!%sc' % flow_data_length + flow_data = struct.unpack_from(pack_str, buf, offset) + + msg = cls(enterprise, flow_data_format, flow_data_length, flow_data) + + return msg + + +class sFlowV5RawPacketHeader(object): + _PACK_STR = '!iIII' + + def __init__(self, header_protocol, frame_length, stripped, + header_size, header): + super(sFlowV5RawPacketHeader, self).__init__() + self.header_protocol = header_protocol + self.frame_length = frame_length + self.stripped = stripped + self.header_size = header_size + self.header = header + + @classmethod + def parser(cls, buf, offset): + (header_protocol, frame_length, stripped, + header_size) = struct.unpack_from(cls._PACK_STR, buf, offset) + + offset += struct.calcsize(cls._PACK_STR) + + header_pack_str = '!%sc' % header_size + header = struct.unpack_from(header_pack_str, buf, offset) + + msg = cls(header_protocol, frame_length, stripped, header_size, header) + return msg + + +class sFlowV5ExtendedSwitchData(object): + _PACK_STR = '!IIII' + + def __init__(self, src_vlan, src_priority, dest_vlan, dest_priority): + super(sFlowV5ExtendedSwitchData, self).__init__() + self.src_vlan = src_vlan + self.src_priority = src_priority + self.dest_vlan = dest_vlan + self.dest_priority = dest_priority + + @classmethod + def parser(cls, buf, offset): + (src_vlan, src_priority, dest_vlan, + dest_priority) = struct.unpack_from(cls._PACK_STR, buf, offset) + + msg = cls(src_vlan, src_priority, dest_vlan, dest_priority) + return msg + + +class sFlowV5CounterSample(object): + _PACK_STR = '!III' + + def __init__(self, sequence_number, source_id_type, source_id_index, + counters_records_num, counters_records): + super(sFlowV5CounterSample, self).__init__() + self.sequence_number = sequence_number + self.source_id_type = source_id_type + self.source_id_index = source_id_index + self.counters_records_num = counters_records_num + self.counters_records = counters_records + + @classmethod + def parser(cls, buf, offset): + (sequence_number, source_id, + counters_records_num) = struct.unpack_from(cls._PACK_STR, buf, offset) + + index_mask = 0xffffff + type_shiftbit = 24 + + source_id_index = source_id & index_mask + source_id_type = source_id >> type_shiftbit + + offset += struct.calcsize(cls._PACK_STR) + + counters_records = [] + + for i in range(counters_records_num): + counter_record = sFlowV5CounterRecord.parser(buf, offset) + offset += sFlowV5CounterRecord.MIN_LEN + offset += counter_record.counter_data_length + counters_records.append(counter_record) + + msg = cls(sequence_number, source_id_type, source_id_index, + counters_records_num, counters_records) + + return msg + + +class sFlowV5CounterRecord(object): + _PACK_STR = '!II' + MIN_LEN = struct.calcsize(_PACK_STR) + + def __init__(self, enterprise, counter_data_format, + counter_data_length, counter_data): + super(sFlowV5CounterRecord, self).__init__() + self.enterprise = enterprise + self.counter_data_format = counter_data_format + self.counter_data_length = counter_data_length + self.counter_data = counter_data + + @classmethod + def parser(cls, buf, offset): + (counterdata_format, + counter_data_length) = struct.unpack_from(cls._PACK_STR, buf, offset) + + format_mask = 0xfff + enterprise_shiftbit = 12 + + counter_data_format = counterdata_format & format_mask + enterprise = counterdata_format >> enterprise_shiftbit + + offset += cls.MIN_LEN + + if counter_data_format == 1: + # Generic Interface Counters + counter_data = sFlowV5GenericInterfaceCounters.parser(buf, offset) + else: + #TODO: + # counter_data_format == 2 : Ethernet Interface Counters + # counter_data_format == 3 : Token Ring Counters + # counter_data_format == 4 : 100 BaseVG Interface Counters + # counter_data_format == 5 : VLAN Counters + # counter_data_format == 1001 : Processor Information + LOG.info("Unknown format. " + + "sFlowV5CounterRecord.counter_data_format=%d" + % counter_data_format) + pack_str = '!%sc' % counter_data_length + counter_data = struct.unpack_from(pack_str, buf, offset) + + msg = cls(enterprise, counter_data_format, + counter_data_length, counter_data) + + return msg + + +class sFlowV5GenericInterfaceCounters(object): + _PACK_STR = '!IIQIIQIIIIIIQIIIIII' + + def __init__(self, ifIndex, ifType, ifSpeed, ifDirection, + ifAdminStatus, ifOperStatus, ifInOctets, ifInUcastPkts, + ifInMulticastPkts, ifInBroadcastPkts, ifInDiscards, + ifInErrors, ifInUnknownProtos, ifOutOctets, + ifOutUcastPkts, ifOutMulticastPkts, ifOutBroadcastPkts, + ifOutDiscards, ifOutErrors, ifPromiscuousMode): + super(sFlowV5GenericInterfaceCounters, self).__init__() + self.ifIndex = ifIndex + self.ifType = ifType + self.ifSpeed = ifSpeed + self.ifDirection = ifDirection + self.ifAdminStatus = ifAdminStatus + self.ifOperStatus = ifOperStatus + self.ifInOctets = ifInOctets + self.ifInUcastPkts = ifInUcastPkts + self.ifInMulticastPkts = ifInMulticastPkts + self.ifInBroadcastPkts = ifInBroadcastPkts + self.ifInDiscards = ifInDiscards + self.ifInErrors = ifInErrors + self.ifInUnknownProtos = ifInUnknownProtos + self.ifOutOctets = ifOutOctets + self.ifOutUcastPkts = ifOutUcastPkts + self.ifOutMulticastPkts = ifOutMulticastPkts + self.ifOutBroadcastPkts = ifOutBroadcastPkts + self.ifOutDiscards = ifOutDiscards + self.ifOutErrors = ifOutErrors + self.ifPromiscuousMode = ifPromiscuousMode + + @classmethod + def parser(cls, buf, offset): + (ifIndex, ifType, ifSpeed, ifDirection, ifStatus, ifInOctets, + ifInUcastPkts, ifInMulticastPkts, ifInBroadcastPkts, ifInDiscards, + ifInErrors, ifInUnknownProtos, ifOutOctets, ifOutUcastPkts, + ifOutMulticastPkts, ifOutBroadcastPkts, ifOutDiscards, ifOutErrors, + ifPromiscuousMode,) = struct.unpack_from(cls._PACK_STR, buf, offset) + + ifStatus_mask = 0x1 + ifAdminStatus_shiftbit = 1 + + ifOperStatus = ifStatus & ifStatus_mask + ifAdminStatus = ifStatus >> ifAdminStatus_shiftbit & ifStatus_mask + + msg = cls(ifIndex, ifType, ifSpeed, ifDirection, ifAdminStatus, + ifOperStatus, ifInOctets, ifInUcastPkts, + ifInMulticastPkts, ifInBroadcastPkts, ifInDiscards, + ifInErrors, ifInUnknownProtos, ifOutOctets, + ifOutUcastPkts, ifOutMulticastPkts, ifOutBroadcastPkts, + ifOutDiscards, ifOutErrors, ifPromiscuousMode) + + return msg |