summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--ryu/app/quantum_adapter.py423
-rw-r--r--ryu/flags.py22
2 files changed, 445 insertions, 0 deletions
diff --git a/ryu/app/quantum_adapter.py b/ryu/app/quantum_adapter.py
new file mode 100644
index 00000000..6c407e7d
--- /dev/null
+++ b/ryu/app/quantum_adapter.py
@@ -0,0 +1,423 @@
+# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
+# Copyright (C) 2012 Isaku Yamahata <yamahata at private email ne jp>
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import gflags
+import logging
+
+from quantumclient import client as q_client
+from quantumclient.common import exceptions as q_exc
+from quantumclient.v2_0 import client as q_clientv2
+
+from ryu.app import conf_switch_key as cs_key
+from ryu.app import rest_nw_id
+from ryu.base import app_manager
+from ryu.controller import (conf_switch,
+ dpset,
+ event,
+ handler,
+ network)
+from ryu.lib import dpid as dpid_lib
+from ryu.lib import mac as mac_lib
+from ryu.lib import quantum_ifaces
+from ryu.lib.ovs import bridge
+from ryu.lib.quantum_ifaces import QuantumIfaces
+
+
+from gevent import monkey
+monkey.patch_all()
+
+
+LOG = logging.getLogger(__name__)
+FLAGS = gflags.FLAGS
+
+
+def _get_auth_token():
+ httpclient = q_client.HTTPClient(
+ username=FLAGS.quantum_admin_username,
+ tenant_name=FLAGS.quantum_admin_tenant_name,
+ password=FLAGS.quantum_admin_password,
+ auth_url=FLAGS.quantum_admin_auth_url,
+ timeout=FLAGS.quantum_url_timeout,
+ auth_strategy=FLAGS.quantum_auth_strategy)
+ try:
+ httpclient.authenticate()
+ except (q_exc.Unauthorized, q_exc.Forbidden, q_exc.EndpointNotFound) as e:
+ LOG.error("authentication failure: %s", e)
+ return None
+ # LOG.debug("_get_auth_token: token=%s", httpclient.auth_token)
+ return httpclient.auth_token
+
+
+def _get_quantum_client(token):
+ if token:
+ my_client = q_clientv2.Client(
+ endpoint_url=FLAGS.quantum_url,
+ token=token, timeout=FLAGS.quantum_url_timeout)
+ else:
+ my_client = q_clientv2.Client(
+ endpoint_url=FLAGS.quantum_url,
+ auth_strategy=None, timeout=FLAGS.quantum_url_timeout)
+ return my_client
+
+
+class OVSPort(object):
+ PORT_ERROR = -1
+ PORT_UNKNOWN = 0
+ PORT_GATEWAY = 1
+ PORT_VETH_GATEWAY = 2
+ PORT_GUEST = 3
+ PORT_TUNNEL = 4
+
+ # extra-ids: 'attached-mac', 'iface-id', 'iface-status', 'vm-uuid'
+ def __init__(self, ofport, port_name):
+ super(OVSPort, self).__init__()
+ self.ofport = ofport
+ self.name = port_name
+ self.type = None
+ self.ext_ids = {}
+ self.options = {}
+
+ def update(self, port):
+ self.__dict__.update((key, port[key]) for key
+ in ['name', 'ofport', 'type']
+ if key in port)
+ if 'external_ids' in port:
+ self.ext_ids = dict(port['external_ids'])
+ if 'options' in port:
+ self.options = dict(port['options'])
+
+ def get_port_type(self):
+ if not isinstance(self.ofport, int):
+ return self.PORT_ERROR
+ if self.type == 'internal' and 'iface-id' in self.ext_ids:
+ return self.PORT_GATEWAY
+ if self.type == '' and 'iface-id' in self.ext_ids:
+ return self.PORT_VETH_GATEWAY
+ if (self.type == 'gre' and 'local_ip' in self.options and
+ 'remote_ip' in self.options):
+ return self.PORT_TUNNEL
+ if self.type == '' and 'vm-uuid' in self.ext_ids:
+ return self.PORT_GUEST
+ return self.PORT_UNKNOWN
+
+ def __str__(self):
+ return "type=%s ofport=%s name=%s, ext_ids=%s options=%s" % (
+ self.type, self.ofport, self.name, self.ext_ids, self.options)
+
+ def __eq__(self, other):
+ return (other is not None and
+ self.ofport == other.ofport and
+ self.type == other.type and
+ self.ext_ids == other.ext_ids and
+ self.options == other.options)
+
+
+class OVSSwitch(object):
+ def __init__(self, dpid, nw, ifaces):
+ # TODO: clean up
+ token = None
+ if FLAGS.quantum_auth_strategy:
+ token = _get_auth_token()
+ q_api = _get_quantum_client(token)
+
+ self.dpid = dpid
+ self.network_api = nw
+ self.ifaces = ifaces
+ self.q_api = q_api
+ self.ctrl_addr = FLAGS.quantum_controller_addr
+
+ self.ovsdb_addr = None
+ self.tunnel_ip = None
+
+ self.ovs_bridge = None
+ self.ports = {} # port_no -> OVSPort
+
+ super(OVSSwitch, self).__init__()
+
+ def set_ovsdb_addr(self, dpid, ovsdb_addr):
+ # easy check if the address format valid
+ LOG.debug('set_ovsdb_addr dpid %s ovsdb_addr %s',
+ dpid_lib.dpid_to_str(dpid), ovsdb_addr)
+ _proto, _host, _port = ovsdb_addr.split(':')
+
+ old_address = self.ovsdb_addr
+ if old_address == ovsdb_addr:
+ return
+ if ovsdb_addr is None:
+ # TODO: clean up this ovs switch
+ if self.ovs_bridge:
+ self.ovs_bridge.del_controller()
+ self.ovs_bridge = None
+ return
+ self.ovsdb_addr = ovsdb_addr
+ if self.ovs_bridge is None:
+ LOG.debug('ovsdb: adding ports %s', self.ports)
+ ovs_bridge = bridge.OVSBridge(dpid, ovsdb_addr)
+ self.ovs_bridge = ovs_bridge
+ ovs_bridge.init()
+ # TODO: for multi-controller
+ # not overwrite controllers, but append this controller
+ ovs_bridge.set_controller([self.ctrl_addr])
+ for port in self.ports.values():
+ LOG.debug('adding port %s', port)
+ self.update_port(port.ofport, port.name, True)
+
+ def _update_external_port(self, port, add=True):
+ if add:
+ self.network_api.update_port(rest_nw_id.NW_ID_EXTERNAL,
+ self.dpid, port.ofport)
+ else:
+ self.network_api.remove_port(rest_nw_id.NW_ID_EXTERNAL,
+ self.dpid, port.ofport)
+
+ def _update_vif_port(self, port, add=True):
+ LOG.debug("_update_vif_port: %s %s", port, add)
+ iface_id = port.ext_ids.get('iface-id')
+ if iface_id is None:
+ return
+
+ if not add:
+ ports = self.ifaces.get_key(iface_id, QuantumIfaces.KEY_PORTS)
+ for p in ports:
+ dpid = p.get(QuantumIfaces.SUBKEY_DATAPATH_ID)
+ if dpid is None:
+ continue
+ if dpid != self.dpid:
+ continue
+
+ other_ovs_ports = self.ifaces.del_key(iface_id,
+ QuantumIfaces.KEY_PORTS,
+ p)
+ if other_ovs_ports:
+ # When live-migration, one of the two OVS ports is deleted.
+ return
+
+ port_data = {
+ 'datapath_id': dpid_lib.dpid_to_str(self.dpid),
+ 'port_no': port.ofport,
+
+ # In order to set
+ # port.status = quantum.common.constants.PORT_STATUS_DOWN
+ # port.status can't be changed via rest api directly,
+ # so resort to ryu-specical parameter to tell it.
+ 'deleted': True
+ }
+ body = {'port': port_data}
+ # LOG.debug("port-body = %s", body)
+
+ try:
+ self.q_api.update_port(port.ext_ids['iface-id'], body)
+ except (q_exc.ConnectionFailed, q_exc.QuantumClientException) as e:
+ LOG.error("quantum update port failed: %s", e)
+ # TODO: When authentication failure occurred,
+ # it should get auth token again
+ return
+
+ # update {network, port, mac}
+ try:
+ network_id = self.ifaces.get_key(iface_id,
+ QuantumIfaces.KEY_NETWORK_ID)
+ except KeyError:
+ return
+ self.network_api.update_network(network_id)
+ self.network_api.update_port(network_id, self.dpid, port.ofport)
+ mac = port.ext_ids.get('attached-mac')
+ if mac:
+ self.network_api.update_mac(network_id, self.dpid, port.ofport,
+ mac_lib.haddr_to_bin(mac))
+
+ def update_port(self, port_no, port_name, add):
+ port_name = port_name.rstrip('\x00')
+ LOG.debug('update_port port_no %d %s %s', port_no, port_name, add)
+ assert port_name is not None
+ old_port = self.ports.get(port_no)
+ if not add:
+ new_port = None
+ self.ports.pop(port_no, None)
+ else:
+ new_port = OVSPort(port_no, port_name)
+ if self.ovs_bridge:
+ port_cfg = self.ovs_bridge.get_quantum_ports(port_name)
+ if port_cfg:
+ if 'ofport' not in port_cfg:
+ port_cfg['ofport'] = port_no
+ if port_cfg['ofport'] != port_no:
+ LOG.warn('inconsistent port_no: %d port_cfg %s',
+ port_no, port_cfg)
+ return
+ if port_cfg['name'] != port_name:
+ LOG.warn('inconsistent port_name: %s port_cfg %s',
+ port_name, port_cfg)
+ return
+ new_port.update(port_cfg)
+
+ self.ports[port_no] = new_port
+ iface_id = new_port.ext_ids.get('iface-id')
+ if iface_id:
+ p = {QuantumIfaces.SUBKEY_DATAPATH_ID: self.dpid,
+ QuantumIfaces.SUBKEY_OFPORT: port_no,
+ QuantumIfaces.SUBKEY_NAME: port_name}
+ self.ifaces.update_key(iface_id, QuantumIfaces.KEY_PORTS, p)
+
+ if old_port == new_port:
+ return
+
+ if not new_port:
+ port_type = old_port.get_port_type()
+ if port_type == OVSPort.PORT_ERROR:
+ return
+ elif port_type == OVSPort.PORT_UNKNOWN:
+ # LOG.info("delete external port: %s", old_port)
+ self._update_external_port(old_port, add=False)
+ else:
+ # LOG.info("delete port: %s", old_port)
+ if port_type != OVSPort.PORT_TUNNEL:
+ self._update_vif_port(old_port, add=False)
+ return
+
+ if new_port.ofport == -1:
+ return
+ if not old_port or old_port.ofport == -1:
+ port_type = new_port.get_port_type()
+ if port_type == OVSPort.PORT_ERROR:
+ return
+ elif port_type == OVSPort.PORT_UNKNOWN:
+ # LOG.info("create external port: %s", new_port)
+ self._update_external_port(new_port)
+ else:
+ # LOG.info("create port: %s", new_port)
+ if port_type != OVSPort.PORT_TUNNEL:
+ self._update_vif_port(new_port)
+ return
+ if new_port.get_port_type() in (OVSPort.PORT_GUEST,
+ OVSPort.PORT_GATEWAY,
+ OVSPort.PORT_VETH_GATEWAY):
+ # LOG.info("update port: %s", new_port)
+ self._update_vif_port(new_port)
+
+
+class QuantumAdapter(app_manager.RyuApp):
+ _CONTEXTS = {
+ 'conf_switch': conf_switch.ConfSwitchSet,
+ 'network': network.Network,
+ 'quantum_ifaces': quantum_ifaces.QuantumIfaces,
+ }
+
+ def __init__(self, *_args, **kwargs):
+ super(QuantumAdapter, self).__init__()
+
+ self.cs = kwargs['conf_switch']
+ self.nw = kwargs['network']
+ self.ifaces = kwargs['quantum_ifaces']
+ self.dps = {}
+
+ for network_id in rest_nw_id.RESERVED_NETWORK_IDS:
+ if network_id == rest_nw_id.NW_ID_UNKNOWN:
+ continue
+ self.nw.update_network(network_id)
+
+ def _get_ovs_switch(self, dpid, create=True):
+ ovs_switch = self.dps.get(dpid)
+ if not ovs_switch:
+ if create:
+ ovs_switch = OVSSwitch(dpid, self.nw, self.ifaces)
+ self.dps[dpid] = ovs_switch
+ else:
+ LOG.debug('ovs switch %s is already known', dpid)
+ return ovs_switch
+
+ def _port_handler(self, dpid, port_no, port_name, add):
+ ovs_switch = self._get_ovs_switch(dpid)
+ if ovs_switch:
+ ovs_switch.update_port(port_no, port_name, add)
+ else:
+ LOG.warn('unknown ovs switch %s %s %s %s\n',
+ dpid, port_no, port_name, add)
+
+ @handler.set_ev_cls(dpset.EventDP)
+ def dp_handler(self, ev):
+ dpid = ev.dp.id
+ ovs_switch = self._get_ovs_switch(dpid)
+ if not ovs_switch:
+ return
+
+ if ev.enter:
+ for port in ev.ports:
+ ovs_switch.update_port(port.port_no, port.name, True)
+ else:
+ # When dp leaving, we don't delete ports because OF connection
+ # can be disconnected for some reason.
+ # TODO: configuration needed to tell that this dp is really
+ # removed.
+ ovs_switch = self.dps.pop(dpid, None)
+ if ovs_switch:
+ ovs_switch.close()
+
+ @handler.set_ev_cls(dpset.EventPortAdd)
+ def port_add_handler(self, ev):
+ port = ev.port
+ self._port_handler(ev.dp.id, port.port_no, port.name, True)
+
+ @handler.set_ev_cls(dpset.EventPortDelete)
+ def port_del_handler(self, ev):
+ port = ev.port
+ self._port_handler(ev.dp.id, port.port_no, port.name, False)
+
+ def _conf_switch_set_ovsdb_addr(self, dpid, value):
+ ovs_switch = self._get_ovs_switch(dpid)
+ ovs_switch.set_ovsdb_addr(dpid, value)
+
+ def _conf_switch_del_ovsdb_addr(self, dpid):
+ ovs_switch = self._get_ovs_switch(dpid, False)
+ if ovs_switch:
+ ovs_switch.set_ovsdb_addr(dpid, None)
+
+ @handler.set_ev_cls(conf_switch.EventConfSwitchSet)
+ def conf_switch_set_handler(self, ev):
+ LOG.debug("conf_switch set: %s", ev)
+ if ev.key == cs_key.OVSDB_ADDR:
+ self._conf_switch_set_ovsdb_addr(ev.dpid, ev.value)
+ else:
+ LOG.debug("unknown event: %s", ev)
+
+ @handler.set_ev_cls(conf_switch.EventConfSwitchDel)
+ def conf_switch_del_handler(self, ev):
+ LOG.debug("conf_switch del: %s", ev)
+ if ev.key == cs_key.OVSDB_ADDR:
+ self._conf_switch_del_ovsdb_addr(ev.dpid)
+ else:
+ LOG.debug("unknown event: %s", ev)
+
+ @handler.set_ev_cls(quantum_ifaces.EventQuantumIfaceSet)
+ def quantum_iface_set_handler(self, ev):
+ if ev.key != quantum_ifaces.QuantumIfaces.KEY_NETWORK_ID:
+ # LOG.debug("unknown key %s", ev.key)
+ return
+ iface_id = ev.iface_id
+ try:
+ ports = self.ifaces.get_key(iface_id, QuantumIfaces.KEY_PORTS)
+ except KeyError:
+ return
+ for p in ports:
+ try:
+ dpid = p[QuantumIfaces.SUBKEY_DATAPATH_ID]
+ ofport = p[QuantumIfaces.SUBKEY_OFPORT]
+ port_name = p[QuantumIfaces.SUBKEY_NAME]
+ except KeyError:
+ continue
+ ovs_switch = self._get_ovs_switch(dpid, False)
+ if ovs_switch:
+ ovs_switch.update_port(ofport, port_name, True)
diff --git a/ryu/flags.py b/ryu/flags.py
index e9696208..610b201f 100644
--- a/ryu/flags.py
+++ b/ryu/flags.py
@@ -23,3 +23,25 @@ FLAGS = gflags.FLAGS
# GLOBAL flags
gflags.DEFINE_boolean('monkey_patch', False, 'do monkey patch')
+
+# app/quantum_adapter
+gflags.DEFINE_string('quantum_url', 'http://localhost:9696',
+ 'URL for connecting to quantum')
+gflags.DEFINE_integer('quantum_url_timeout', 30,
+ 'timeout value for connecting to quantum in seconds')
+gflags.DEFINE_string('quantum_admin_username', 'quantum',
+ 'username for connecting to quantum in admin context')
+gflags.DEFINE_string('quantum_admin_password', 'service_password',
+ 'password for connecting to quantum in admin context')
+gflags.DEFINE_string('quantum_admin_tenant_name', 'service',
+ 'tenant name for connecting to quantum in admin context')
+gflags.DEFINE_string('quantum_admin_auth_url', 'http://localhost:5000/v2.0',
+ 'auth url for connecting to quantum in admin context')
+gflags.DEFINE_string(
+ 'quantum_auth_strategy',
+ 'keystone',
+ 'auth strategy for connecting to quantum in admin context')
+
+gflags.DEFINE_string('quantum_controller_addr', None,
+ 'openflow mehod:address:port to set controller of'
+ 'ovs bridge')