diff options
author | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2013-02-05 20:10:56 +0900 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2013-02-05 23:38:04 +0900 |
commit | 6e76328bbc6243a298cd93a9954cacd45a21e176 (patch) | |
tree | 4e434dbcb830d6ee9faebce400eb95486753a096 | |
parent | 670e2b46b0c53e488f6fabe118164719a01ceff1 (diff) |
app: Add quantum adapter
Based on the following patch:
From: Yoshihiro Kaneko <ykaneko0929@gmail.com>
Subject: app: Add quantum adapter
Thanks to Kaneko for finding and fixing my bugs.
Signed-off-by: Yoshihiro Kaneko <ykaneko0929@gmail.com>
Signed-off-by: Isaku Yamahata <yamahata@valinux.co.jp>
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r-- | ryu/app/quantum_adapter.py | 423 | ||||
-rw-r--r-- | ryu/flags.py | 22 |
2 files changed, 445 insertions, 0 deletions
diff --git a/ryu/app/quantum_adapter.py b/ryu/app/quantum_adapter.py new file mode 100644 index 00000000..6c407e7d --- /dev/null +++ b/ryu/app/quantum_adapter.py @@ -0,0 +1,423 @@ +# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation. +# Copyright (C) 2012 Isaku Yamahata <yamahata at private email ne jp> +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import gflags +import logging + +from quantumclient import client as q_client +from quantumclient.common import exceptions as q_exc +from quantumclient.v2_0 import client as q_clientv2 + +from ryu.app import conf_switch_key as cs_key +from ryu.app import rest_nw_id +from ryu.base import app_manager +from ryu.controller import (conf_switch, + dpset, + event, + handler, + network) +from ryu.lib import dpid as dpid_lib +from ryu.lib import mac as mac_lib +from ryu.lib import quantum_ifaces +from ryu.lib.ovs import bridge +from ryu.lib.quantum_ifaces import QuantumIfaces + + +from gevent import monkey +monkey.patch_all() + + +LOG = logging.getLogger(__name__) +FLAGS = gflags.FLAGS + + +def _get_auth_token(): + httpclient = q_client.HTTPClient( + username=FLAGS.quantum_admin_username, + tenant_name=FLAGS.quantum_admin_tenant_name, + password=FLAGS.quantum_admin_password, + auth_url=FLAGS.quantum_admin_auth_url, + timeout=FLAGS.quantum_url_timeout, + auth_strategy=FLAGS.quantum_auth_strategy) + try: + httpclient.authenticate() + except (q_exc.Unauthorized, q_exc.Forbidden, q_exc.EndpointNotFound) as e: + LOG.error("authentication failure: %s", e) + return None + # LOG.debug("_get_auth_token: token=%s", httpclient.auth_token) + return httpclient.auth_token + + +def _get_quantum_client(token): + if token: + my_client = q_clientv2.Client( + endpoint_url=FLAGS.quantum_url, + token=token, timeout=FLAGS.quantum_url_timeout) + else: + my_client = q_clientv2.Client( + endpoint_url=FLAGS.quantum_url, + auth_strategy=None, timeout=FLAGS.quantum_url_timeout) + return my_client + + +class OVSPort(object): + PORT_ERROR = -1 + PORT_UNKNOWN = 0 + PORT_GATEWAY = 1 + PORT_VETH_GATEWAY = 2 + PORT_GUEST = 3 + PORT_TUNNEL = 4 + + # extra-ids: 'attached-mac', 'iface-id', 'iface-status', 'vm-uuid' + def __init__(self, ofport, port_name): + super(OVSPort, self).__init__() + self.ofport = ofport + self.name = port_name + self.type = None + self.ext_ids = {} + self.options = {} + + def update(self, port): + self.__dict__.update((key, port[key]) for key + in ['name', 'ofport', 'type'] + if key in port) + if 'external_ids' in port: + self.ext_ids = dict(port['external_ids']) + if 'options' in port: + self.options = dict(port['options']) + + def get_port_type(self): + if not isinstance(self.ofport, int): + return self.PORT_ERROR + if self.type == 'internal' and 'iface-id' in self.ext_ids: + return self.PORT_GATEWAY + if self.type == '' and 'iface-id' in self.ext_ids: + return self.PORT_VETH_GATEWAY + if (self.type == 'gre' and 'local_ip' in self.options and + 'remote_ip' in self.options): + return self.PORT_TUNNEL + if self.type == '' and 'vm-uuid' in self.ext_ids: + return self.PORT_GUEST + return self.PORT_UNKNOWN + + def __str__(self): + return "type=%s ofport=%s name=%s, ext_ids=%s options=%s" % ( + self.type, self.ofport, self.name, self.ext_ids, self.options) + + def __eq__(self, other): + return (other is not None and + self.ofport == other.ofport and + self.type == other.type and + self.ext_ids == other.ext_ids and + self.options == other.options) + + +class OVSSwitch(object): + def __init__(self, dpid, nw, ifaces): + # TODO: clean up + token = None + if FLAGS.quantum_auth_strategy: + token = _get_auth_token() + q_api = _get_quantum_client(token) + + self.dpid = dpid + self.network_api = nw + self.ifaces = ifaces + self.q_api = q_api + self.ctrl_addr = FLAGS.quantum_controller_addr + + self.ovsdb_addr = None + self.tunnel_ip = None + + self.ovs_bridge = None + self.ports = {} # port_no -> OVSPort + + super(OVSSwitch, self).__init__() + + def set_ovsdb_addr(self, dpid, ovsdb_addr): + # easy check if the address format valid + LOG.debug('set_ovsdb_addr dpid %s ovsdb_addr %s', + dpid_lib.dpid_to_str(dpid), ovsdb_addr) + _proto, _host, _port = ovsdb_addr.split(':') + + old_address = self.ovsdb_addr + if old_address == ovsdb_addr: + return + if ovsdb_addr is None: + # TODO: clean up this ovs switch + if self.ovs_bridge: + self.ovs_bridge.del_controller() + self.ovs_bridge = None + return + self.ovsdb_addr = ovsdb_addr + if self.ovs_bridge is None: + LOG.debug('ovsdb: adding ports %s', self.ports) + ovs_bridge = bridge.OVSBridge(dpid, ovsdb_addr) + self.ovs_bridge = ovs_bridge + ovs_bridge.init() + # TODO: for multi-controller + # not overwrite controllers, but append this controller + ovs_bridge.set_controller([self.ctrl_addr]) + for port in self.ports.values(): + LOG.debug('adding port %s', port) + self.update_port(port.ofport, port.name, True) + + def _update_external_port(self, port, add=True): + if add: + self.network_api.update_port(rest_nw_id.NW_ID_EXTERNAL, + self.dpid, port.ofport) + else: + self.network_api.remove_port(rest_nw_id.NW_ID_EXTERNAL, + self.dpid, port.ofport) + + def _update_vif_port(self, port, add=True): + LOG.debug("_update_vif_port: %s %s", port, add) + iface_id = port.ext_ids.get('iface-id') + if iface_id is None: + return + + if not add: + ports = self.ifaces.get_key(iface_id, QuantumIfaces.KEY_PORTS) + for p in ports: + dpid = p.get(QuantumIfaces.SUBKEY_DATAPATH_ID) + if dpid is None: + continue + if dpid != self.dpid: + continue + + other_ovs_ports = self.ifaces.del_key(iface_id, + QuantumIfaces.KEY_PORTS, + p) + if other_ovs_ports: + # When live-migration, one of the two OVS ports is deleted. + return + + port_data = { + 'datapath_id': dpid_lib.dpid_to_str(self.dpid), + 'port_no': port.ofport, + + # In order to set + # port.status = quantum.common.constants.PORT_STATUS_DOWN + # port.status can't be changed via rest api directly, + # so resort to ryu-specical parameter to tell it. + 'deleted': True + } + body = {'port': port_data} + # LOG.debug("port-body = %s", body) + + try: + self.q_api.update_port(port.ext_ids['iface-id'], body) + except (q_exc.ConnectionFailed, q_exc.QuantumClientException) as e: + LOG.error("quantum update port failed: %s", e) + # TODO: When authentication failure occurred, + # it should get auth token again + return + + # update {network, port, mac} + try: + network_id = self.ifaces.get_key(iface_id, + QuantumIfaces.KEY_NETWORK_ID) + except KeyError: + return + self.network_api.update_network(network_id) + self.network_api.update_port(network_id, self.dpid, port.ofport) + mac = port.ext_ids.get('attached-mac') + if mac: + self.network_api.update_mac(network_id, self.dpid, port.ofport, + mac_lib.haddr_to_bin(mac)) + + def update_port(self, port_no, port_name, add): + port_name = port_name.rstrip('\x00') + LOG.debug('update_port port_no %d %s %s', port_no, port_name, add) + assert port_name is not None + old_port = self.ports.get(port_no) + if not add: + new_port = None + self.ports.pop(port_no, None) + else: + new_port = OVSPort(port_no, port_name) + if self.ovs_bridge: + port_cfg = self.ovs_bridge.get_quantum_ports(port_name) + if port_cfg: + if 'ofport' not in port_cfg: + port_cfg['ofport'] = port_no + if port_cfg['ofport'] != port_no: + LOG.warn('inconsistent port_no: %d port_cfg %s', + port_no, port_cfg) + return + if port_cfg['name'] != port_name: + LOG.warn('inconsistent port_name: %s port_cfg %s', + port_name, port_cfg) + return + new_port.update(port_cfg) + + self.ports[port_no] = new_port + iface_id = new_port.ext_ids.get('iface-id') + if iface_id: + p = {QuantumIfaces.SUBKEY_DATAPATH_ID: self.dpid, + QuantumIfaces.SUBKEY_OFPORT: port_no, + QuantumIfaces.SUBKEY_NAME: port_name} + self.ifaces.update_key(iface_id, QuantumIfaces.KEY_PORTS, p) + + if old_port == new_port: + return + + if not new_port: + port_type = old_port.get_port_type() + if port_type == OVSPort.PORT_ERROR: + return + elif port_type == OVSPort.PORT_UNKNOWN: + # LOG.info("delete external port: %s", old_port) + self._update_external_port(old_port, add=False) + else: + # LOG.info("delete port: %s", old_port) + if port_type != OVSPort.PORT_TUNNEL: + self._update_vif_port(old_port, add=False) + return + + if new_port.ofport == -1: + return + if not old_port or old_port.ofport == -1: + port_type = new_port.get_port_type() + if port_type == OVSPort.PORT_ERROR: + return + elif port_type == OVSPort.PORT_UNKNOWN: + # LOG.info("create external port: %s", new_port) + self._update_external_port(new_port) + else: + # LOG.info("create port: %s", new_port) + if port_type != OVSPort.PORT_TUNNEL: + self._update_vif_port(new_port) + return + if new_port.get_port_type() in (OVSPort.PORT_GUEST, + OVSPort.PORT_GATEWAY, + OVSPort.PORT_VETH_GATEWAY): + # LOG.info("update port: %s", new_port) + self._update_vif_port(new_port) + + +class QuantumAdapter(app_manager.RyuApp): + _CONTEXTS = { + 'conf_switch': conf_switch.ConfSwitchSet, + 'network': network.Network, + 'quantum_ifaces': quantum_ifaces.QuantumIfaces, + } + + def __init__(self, *_args, **kwargs): + super(QuantumAdapter, self).__init__() + + self.cs = kwargs['conf_switch'] + self.nw = kwargs['network'] + self.ifaces = kwargs['quantum_ifaces'] + self.dps = {} + + for network_id in rest_nw_id.RESERVED_NETWORK_IDS: + if network_id == rest_nw_id.NW_ID_UNKNOWN: + continue + self.nw.update_network(network_id) + + def _get_ovs_switch(self, dpid, create=True): + ovs_switch = self.dps.get(dpid) + if not ovs_switch: + if create: + ovs_switch = OVSSwitch(dpid, self.nw, self.ifaces) + self.dps[dpid] = ovs_switch + else: + LOG.debug('ovs switch %s is already known', dpid) + return ovs_switch + + def _port_handler(self, dpid, port_no, port_name, add): + ovs_switch = self._get_ovs_switch(dpid) + if ovs_switch: + ovs_switch.update_port(port_no, port_name, add) + else: + LOG.warn('unknown ovs switch %s %s %s %s\n', + dpid, port_no, port_name, add) + + @handler.set_ev_cls(dpset.EventDP) + def dp_handler(self, ev): + dpid = ev.dp.id + ovs_switch = self._get_ovs_switch(dpid) + if not ovs_switch: + return + + if ev.enter: + for port in ev.ports: + ovs_switch.update_port(port.port_no, port.name, True) + else: + # When dp leaving, we don't delete ports because OF connection + # can be disconnected for some reason. + # TODO: configuration needed to tell that this dp is really + # removed. + ovs_switch = self.dps.pop(dpid, None) + if ovs_switch: + ovs_switch.close() + + @handler.set_ev_cls(dpset.EventPortAdd) + def port_add_handler(self, ev): + port = ev.port + self._port_handler(ev.dp.id, port.port_no, port.name, True) + + @handler.set_ev_cls(dpset.EventPortDelete) + def port_del_handler(self, ev): + port = ev.port + self._port_handler(ev.dp.id, port.port_no, port.name, False) + + def _conf_switch_set_ovsdb_addr(self, dpid, value): + ovs_switch = self._get_ovs_switch(dpid) + ovs_switch.set_ovsdb_addr(dpid, value) + + def _conf_switch_del_ovsdb_addr(self, dpid): + ovs_switch = self._get_ovs_switch(dpid, False) + if ovs_switch: + ovs_switch.set_ovsdb_addr(dpid, None) + + @handler.set_ev_cls(conf_switch.EventConfSwitchSet) + def conf_switch_set_handler(self, ev): + LOG.debug("conf_switch set: %s", ev) + if ev.key == cs_key.OVSDB_ADDR: + self._conf_switch_set_ovsdb_addr(ev.dpid, ev.value) + else: + LOG.debug("unknown event: %s", ev) + + @handler.set_ev_cls(conf_switch.EventConfSwitchDel) + def conf_switch_del_handler(self, ev): + LOG.debug("conf_switch del: %s", ev) + if ev.key == cs_key.OVSDB_ADDR: + self._conf_switch_del_ovsdb_addr(ev.dpid) + else: + LOG.debug("unknown event: %s", ev) + + @handler.set_ev_cls(quantum_ifaces.EventQuantumIfaceSet) + def quantum_iface_set_handler(self, ev): + if ev.key != quantum_ifaces.QuantumIfaces.KEY_NETWORK_ID: + # LOG.debug("unknown key %s", ev.key) + return + iface_id = ev.iface_id + try: + ports = self.ifaces.get_key(iface_id, QuantumIfaces.KEY_PORTS) + except KeyError: + return + for p in ports: + try: + dpid = p[QuantumIfaces.SUBKEY_DATAPATH_ID] + ofport = p[QuantumIfaces.SUBKEY_OFPORT] + port_name = p[QuantumIfaces.SUBKEY_NAME] + except KeyError: + continue + ovs_switch = self._get_ovs_switch(dpid, False) + if ovs_switch: + ovs_switch.update_port(ofport, port_name, True) diff --git a/ryu/flags.py b/ryu/flags.py index e9696208..610b201f 100644 --- a/ryu/flags.py +++ b/ryu/flags.py @@ -23,3 +23,25 @@ FLAGS = gflags.FLAGS # GLOBAL flags gflags.DEFINE_boolean('monkey_patch', False, 'do monkey patch') + +# app/quantum_adapter +gflags.DEFINE_string('quantum_url', 'http://localhost:9696', + 'URL for connecting to quantum') +gflags.DEFINE_integer('quantum_url_timeout', 30, + 'timeout value for connecting to quantum in seconds') +gflags.DEFINE_string('quantum_admin_username', 'quantum', + 'username for connecting to quantum in admin context') +gflags.DEFINE_string('quantum_admin_password', 'service_password', + 'password for connecting to quantum in admin context') +gflags.DEFINE_string('quantum_admin_tenant_name', 'service', + 'tenant name for connecting to quantum in admin context') +gflags.DEFINE_string('quantum_admin_auth_url', 'http://localhost:5000/v2.0', + 'auth url for connecting to quantum in admin context') +gflags.DEFINE_string( + 'quantum_auth_strategy', + 'keystone', + 'auth strategy for connecting to quantum in admin context') + +gflags.DEFINE_string('quantum_controller_addr', None, + 'openflow mehod:address:port to set controller of' + 'ovs bridge') |