diff options
Diffstat (limited to 'ryu/app/tunnel_port_updater.py')
-rw-r--r-- | ryu/app/tunnel_port_updater.py | 473 |
1 files changed, 0 insertions, 473 deletions
diff --git a/ryu/app/tunnel_port_updater.py b/ryu/app/tunnel_port_updater.py deleted file mode 100644 index 6e25c8f8..00000000 --- a/ryu/app/tunnel_port_updater.py +++ /dev/null @@ -1,473 +0,0 @@ -# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation. -# Copyright (C) 2012 Isaku Yamahata <yamahata at private email ne jp> -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -This module updates OVS tunnel ports for OpenStack integration. -""" - -import collections -from ryu import cfg -import logging -import netaddr - -from ryu import exception as ryu_exc -from ryu.app import conf_switch_key as cs_key -from ryu.app import rest_nw_id -from ryu.base import app_manager -from ryu.controller import (conf_switch, - handler, - network, - tunnels) -from ryu.lib import dpid as dpid_lib -from ryu.lib import hub -from ryu.lib.ovs import bridge as ovs_bridge - - -_TUNNEL_TYPE_TO_NW_ID = { - 'gre': rest_nw_id.NW_ID_VPORT_GRE, -} - - -class NetworkAPI(object): - """Internal adopter class for RestAPI""" - def __init__(self, network_): - super(NetworkAPI, self).__init__() - self.nw = network_ - - def update_network(self, network_id): - self.nw.update_network(network_id) - - def create_port(self, network_id, dpid, port_id): - self.nw.create_port(network_id, dpid, port_id) - - def update_port(self, network_id, dpid, port_id): - self.nw.update_port(network_id, dpid, port_id) - - def delete_port(self, network_id, dpid, port_id): - try: - self.nw.remove_port(network_id, dpid, port_id) - except (ryu_exc.NetworkNotFound, ryu_exc.PortNotFound): - pass - - -class TunnelAPI(object): - """Internal adopter class for RestTunnelAPI""" - def __init__(self, tunnels_): - super(TunnelAPI, self).__init__() - self.tunnels = tunnels_ - - def update_remote_dpid(self, dpid, port_id, remote_dpid): - self.tunnels.update_port(dpid, port_id, remote_dpid) - - def create_remote_dpid(self, dpid, port_id, remote_dpid): - self.tunnels.register_port(dpid, port_id, remote_dpid) - - def delete_port(self, dpid, port_id): - try: - self.tunnels.delete_port(dpid, port_id) - except ryu_exc.PortNotFound: - pass - - -class TunnelPort(object): - def __init__(self, dpid, port_no, local_ip, remote_ip, remote_dpid=None): - super(TunnelPort, self).__init__() - self.dpid = dpid - self.port_no = port_no - self.local_ip = local_ip - self.remote_ip = remote_ip - self.remote_dpid = remote_dpid - - def __eq__(self, other): - return (self.dpid == other.dpid and - self.port_no == other.port_no and - self.local_ip == other.local_ip and - self.remote_ip == other.remote_ip and - self.remote_dpid == other.remote_dpid) - - -class TunnelDP(object): - def __init__(self, CONF, dpid, ovsdb_addr, tunnel_ip, tunnel_type, - conf_switch_, network_api, tunnel_api, logger): - super(TunnelDP, self).__init__() - self.dpid = dpid - self.network_api = network_api - self.tunnel_api = tunnel_api - self.logger = logger - - self.ovs_bridge = ovs_bridge.OVSBridge(CONF, dpid, ovsdb_addr) - - self.tunnel_ip = tunnel_ip - self.tunnel_type = tunnel_type - self.tunnel_nw_id = _TUNNEL_TYPE_TO_NW_ID[tunnel_type] - self.tunnels = {} # port number -> TunnelPort - - self.conf_switch = conf_switch_ - self.inited = False - - self.req_q = hub.Queue() - self.thr = hub.spawn(self._serve_loop) - - def _init(self): - self.ovs_bridge.init() - for tp in self.ovs_bridge.get_tunnel_ports(self.tunnel_type): - if tp.local_ip != self.tunnel_ip: - self.logger.warn('unknown tunnel port %s', tp) - continue - - remote_dpid = self.conf_switch.find_dpid(cs_key.OVS_TUNNEL_ADDR, - tp.remote_ip) - self.tunnels[tp.ofport] = TunnelPort(self.dpid, tp.ofport, - self.tunnel_ip, tp.remote_ip, - remote_dpid) - if remote_dpid: - self._api_update(tp.ofport, remote_dpid) - - self.conf_switch = None - self.inited = True - - def _api_update(self, port_no, remote_dpid): - self.network_api.update_port(self.tunnel_nw_id, self.dpid, port_no) - self.tunnel_api.update_remote_dpid(self.dpid, port_no, remote_dpid) - - def _api_delete(self, port_no): - self.network_api.delete_port(self.tunnel_nw_id, self.dpid, port_no) - self.tunnel_api.delete_port(self.dpid, port_no) - - def _update_remote(self, remote_dpid, remote_ip): - if self.dpid == remote_dpid: - if self.tunnel_ip == remote_ip: - return - - # tunnel ip address is changed. - self.logger.warn('local ip address is changed %s: %s -> %s', - dpid_lib.dpid_to_str(remote_dpid), - self.tunnel_ip, remote_ip) - # recreate tunnel ports. - for tp in list(self.tunnels.values()): - if tp.remote_dpid is None: - # TODO:XXX - continue - - self._del_tunnel_port(tp.port_no, tp.local_ip, tp.remote_ip) - - new_tp = self._add_tunnel_port(tp.remote_dpid, tp.remote_ip) - self._api_update(new_tp.ofport, tp.remote_dpid) - return - - if self.tunnel_ip == remote_ip: - self.logger.error('ip conflict: %s %s %s', - dpid_lib.dpid_to_str(self.dpid), - dpid_lib.dpid_to_str(remote_dpid), remote_ip) - # XXX What should we do? - return - - for tp in list(self.tunnels.values()): - if tp.remote_dpid == remote_dpid: - if tp.remote_ip == remote_ip: - self._api_update(tp.port_no, remote_dpid) - continue - - self.logger.warn('remote ip address is changed %s: %s -> %s', - dpid_lib.dpid_to_str(remote_dpid), - tp.remote_ip, remote_ip) - self._del_tunnel_port(tp.port_no, self.tunnel_ip, tp.remote_ip) - - new_tp = self._add_tunnel_port(remote_dpid, remote_ip) - self._api_update(new_tp.ofport, remote_dpid) - elif tp.remote_ip == remote_ip: - assert tp.remote_dpid is None - self._api_update(tp.port_no, remote_dpid) - tp.remote_dpid = remote_dpid - - @staticmethod - def _to_hex(ip_addr): - # assuming IPv4 address - assert netaddr.IPAddress(ip_addr).ipv4() - return "%02x%02x%02x%02x" % netaddr.IPAddress(ip_addr).words - - @staticmethod - def _port_name(local_ip, remote_ip): - # ovs requires requires less or equals to 14 bytes length - # gre<remote>-<local lsb> - _PORT_NAME_LENGTH = 14 - local_hex = TunnelDP._to_hex(local_ip) - remote_hex = TunnelDP._to_hex(remote_ip) - return ("gre%s-%s" % (remote_hex, local_hex))[:_PORT_NAME_LENGTH] - - def _tunnel_port_exists(self, remote_dpid, remote_ip): - return any(tp.remote_dpid == remote_dpid and tp.remote_ip == remote_ip - for tp in self.tunnels.values()) - - def _add_tunnel_port(self, remote_dpid, remote_ip): - self.logger.debug('add_tunnel_port local %s %s remote %s %s', - dpid_lib.dpid_to_str(self.dpid), self.tunnel_ip, - dpid_lib.dpid_to_str(remote_dpid), remote_ip) - if self._tunnel_port_exists(remote_dpid, remote_ip): - self.logger.debug('add_tunnel_port nop') - return - - self.logger.debug('add_tunnel_port creating port') - port_name = self._port_name(self.tunnel_ip, remote_ip) - self.ovs_bridge.add_tunnel_port(port_name, self.tunnel_type, - self.tunnel_ip, remote_ip, 'flow') - - tp = self.ovs_bridge.get_tunnel_port(port_name, self.tunnel_type) - self.tunnels[tp.ofport] = TunnelPort(self.dpid, tp.ofport, - tp.local_ip, tp.remote_ip, - remote_dpid) - self.network_api.create_port(self.tunnel_nw_id, self.dpid, tp.ofport) - self.tunnel_api.create_remote_dpid(self.dpid, tp.ofport, remote_dpid) - return tp - - def _del_tunnel_port(self, port_no, local_ip, remote_ip): - port_name = self._port_name(local_ip, remote_ip) - self.ovs_bridge.del_port(port_name) - del self.tunnels[port_no] - self._api_delete(port_no) - - def _del_tunnel_port_ip(self, remote_ip): - for tp in self.tunnels.values(): - if tp.remote_ip == remote_ip: - self._del_tunnel_port(tp.port_no, self.tunnel_ip, remote_ip) - break - - # serialize requests to this OVS DP - _RequestUpdateRemote = collections.namedtuple('_RequestUpdateRemote', - ('remote_dpid', 'remote_ip')) - _RequestAddTunnelPort = collections.namedtuple('_RequestAddTunnelPort', - ('remote_dpid', - 'remote_ip')) - _RequestDelTunnelPort = collections.namedtuple('_RequestDelTunnelPort', - ('remote_ip')) - - class _RequestClose(object): - pass - - def request_update_remote(self, remote_dpid, remote_ip): - self.req_q.put(self._RequestUpdateRemote(remote_dpid, remote_ip)) - - def request_add_tunnel_port(self, remote_dpid, remote_ip): - self.req_q.put(self._RequestAddTunnelPort(remote_dpid, remote_ip)) - - def request_del_tunnel_port(self, remote_ip): - self.req_q.put(self._RequestDelTunnelPort(remote_ip)) - - def close(self): - # self.thr.kill() - self.req_q.put(self._RequestClose()) - self.thr.join() - self.thr = None - - def _serve_loop(self): - # TODO:XXX backoff timeout - # TOOD:XXX and then, abandon and notify the caller(TunnelPortUpdater) - - # TODO: if possible, squash requests? - # For example, RequestAddTunnelPort and RequestDelTunnelPort - # with same dpid are in the queue. AddTunnelPort request - # can be skipped. - # When ovsdb-server and vswitchd are over-loaded - # (or connection to ovsdb are unstable), squashing request - # would increase stability a bit? - # But unsure how effective it would be. - - if not self.inited: - try: - self._init() - except hub.Timeout: - self.logger.warn('_init timeouted') - - req = None - while True: - if req is None: - req = self.req_q.get() - if isinstance(req, self._RequestClose): - return - - try: - if not self.inited: - self._init() - - # shoud use dispatcher? - if isinstance(req, self._RequestUpdateRemote): - self.logger.debug('update_remote') - self._update_remote(req.remote_dpid, req.remote_ip) - elif isinstance(req, self._RequestAddTunnelPort): - self.logger.debug('add_tunnel_port') - self._add_tunnel_port(req.remote_dpid, req.remote_ip) - elif isinstance(req, self._RequestDelTunnelPort): - self.logger.debug('del_tunnel_port') - self._del_tunnel_port_ip(req.remote_ip) - else: - self.logger.error('unknown request %s', req) - except hub.Timeout: - # timeout. try again - self.logger.warn('timeout try again') - continue - else: - # Done. move onto next request - req = None - - -class TunnelDPSet(dict): - """ dpid -> TunndlDP """ - pass - - -# import collections -# class TunnelRequests(collections.defaultdict(set)): -class TunnelRequests(dict): - def add(self, dpid0, dpid1): - self.setdefault(dpid0, set()).add(dpid1) - self.setdefault(dpid1, set()).add(dpid0) - - def remove(self, dpid0, dpid1): - self[dpid0].remove(dpid1) - self[dpid1].remove(dpid0) - - def get_remote(self, dpid): - return self.setdefault(dpid, set()) - - -class TunnelPortUpdater(app_manager.RyuApp): - _CONTEXTS = { - 'conf_switch': conf_switch.ConfSwitchSet, - 'network': network.Network, - 'tunnels': tunnels.Tunnels, - } - - def __init__(self, *args, **kwargs): - super(TunnelPortUpdater, self).__init__(args, kwargs) - self.CONF.register_opts([ - cfg.StrOpt('tunnel-type', default='gre', - help='tunnel type for ovs tunnel port') - ]) - self.tunnel_type = self.CONF.tunnel_type - self.cs = kwargs['conf_switch'] - self.nw = kwargs['network'] - self.tunnels = kwargs['tunnels'] - self.tunnel_dpset = TunnelDPSet() - self.tunnel_requests = TunnelRequests() - - self.network_api = NetworkAPI(self.nw) - self.tunnel_api = TunnelAPI(self.tunnels) - self.network_api.update_network( - _TUNNEL_TYPE_TO_NW_ID[self.tunnel_type]) - - def _ovsdb_update(self, dpid, ovsdb_addr, ovs_tunnel_addr): - self.logger.debug('_ovsdb_update %s %s %s', - dpid_lib.dpid_to_str(dpid), ovsdb_addr, - ovs_tunnel_addr) - if dpid not in self.tunnel_dpset: - # TODO:XXX changing ovsdb_addr, ovs_tunnel_addr - tunnel_dp = TunnelDP(self.CONF, dpid, ovsdb_addr, ovs_tunnel_addr, - self.tunnel_type, self.cs, - self.network_api, self.tunnel_api, - self.logger) - self.tunnel_dpset[dpid] = tunnel_dp - - tunnel_dp = self.tunnel_dpset.get(dpid) - assert tunnel_dp - self._add_tunnel_ports(tunnel_dp, - self.tunnel_requests.get_remote(dpid)) - - @handler.set_ev_cls(conf_switch.EventConfSwitchSet) - def conf_switch_set_handler(self, ev): - self.logger.debug('conf_switch_set_handler %s %s %s', - dpid_lib.dpid_to_str(ev.dpid), ev.key, ev.value) - dpid = ev.dpid - if (ev.key == cs_key.OVSDB_ADDR or ev.key == cs_key.OVS_TUNNEL_ADDR): - if ((dpid, cs_key.OVSDB_ADDR) in self.cs and - (dpid, cs_key.OVS_TUNNEL_ADDR) in self.cs): - self._ovsdb_update( - dpid, self.cs.get_key(dpid, cs_key.OVSDB_ADDR), - self.cs.get_key(dpid, cs_key.OVS_TUNNEL_ADDR)) - - if ev.key == cs_key.OVS_TUNNEL_ADDR: - for tunnel_dp in self.tunnel_dpset.values(): - tunnel_dp.request_update_remote(ev.dpid, ev.value) - - @handler.set_ev_cls(conf_switch.EventConfSwitchDel) - def conf_switch_del_handler(self, ev): - # TODO:XXX - pass - - def _add_tunnel_ports(self, tunnel_dp, remote_dpids): - self.logger.debug('_add_tunnel_ports %s %s', tunnel_dp, remote_dpids) - for remote_dpid in remote_dpids: - remote_dp = self.tunnel_dpset.get(remote_dpid) - if remote_dp is None: - continue - tunnel_dp.request_add_tunnel_port(remote_dp.dpid, - remote_dp.tunnel_ip) - remote_dp.request_add_tunnel_port(tunnel_dp.dpid, - tunnel_dp.tunnel_ip) - - def _vm_port_add(self, network_id, dpid): - self.logger.debug('_vm_port_add %s %s', network_id, - dpid_lib.dpid_to_str(dpid)) - dpids = self.nw.get_dpids(network_id) - dpids.remove(dpid) - for remote_dpid in dpids: - self.tunnel_requests.add(dpid, remote_dpid) - - tunnel_dp = self.tunnel_dpset.get(dpid) - if tunnel_dp is None: - return - self._add_tunnel_ports(tunnel_dp, dpids) - - def _vm_port_del(self, network_id, dpid): - self.logger.debug('_vm_port_del %s %s', network_id, - dpid_lib.dpid_to_str(dpid)) - if len(self.nw.get_ports(dpid, network_id)) > 0: - return - - tunnel_networks = set(p.network_id - for p in self.nw.get_networks(dpid)) - tunnel_networks.discard(network_id) - tunnel_networks.difference_update(rest_nw_id.RESERVED_NETWORK_IDS) - dpids = self.nw.get_dpids(network_id).copy() - dpids.discard(dpid) - del_dpids = [] - for remote_dpid in dpids: - remote_networks = set(p.network_id - for p in self.nw.get_networks(remote_dpid)) - if tunnel_networks & remote_networks: - continue - self.tunnel_requests.remove(dpid, remote_dpid) - del_dpids.append(remote_dpid) - - tunnel_dp = self.tunnel_dpset.get(dpid) - if tunnel_dp is None: - return - for remote_dpid in del_dpids: - remote_dp = self.tunnel_dpset.get(remote_dpid) - if remote_dp is None: - continue - tunnel_dp.request_del_tunnel_port(remote_dp.tunnel_ip) - remote_dp.request_del_tunnel_port(tunnel_dp.tunnel_ip) - - @handler.set_ev_cls(network.EventNetworkPort) - def network_port_handler(self, ev): - self.logger.debug('network_port_handler %s', ev) - if ev.network_id in rest_nw_id.RESERVED_NETWORK_IDS: - return - - if ev.add_del: - self._vm_port_add(ev.network_id, ev.dpid) - else: - self._vm_port_del(ev.network_id, ev.dpid) |