summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorJason Kölker <jason@koelker.net>2015-07-24 17:58:02 +0000
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2015-07-30 16:54:46 +0900
commitaa198d6900e2044ad940f4a1e89b3e7e0174c8c1 (patch)
tree3f0df4154e88a9380a659000a08280bf43524dab
parent0de43f7b6038a0045c3f5f46fa74a6b2a394c112 (diff)
Add OVSDB manager protocol application
Allows listening on a socket for OVSDB clients, reacting to their events and modifying their database. Co-Authored-By: Chris Hansen <chris.hansen.career@gmail.com> Co-Authored-By: Ravi Kamachi <ravi.kamachi@rackspace.com> Signed-off-by: Jason Kölker <jason@koelker.net> Signed-off-by: Chris Hansen <chris.hansen.career@gmail.com> Signed-off-by: Ravi Kamachi <ravi.kamachi@rackspace.com> Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r--doc/source/library.rst1
-rw-r--r--doc/source/library_ovsdb_manager.rst61
-rw-r--r--ryu/services/protocols/ovsdb/__init__.py14
-rw-r--r--ryu/services/protocols/ovsdb/api.py137
-rw-r--r--ryu/services/protocols/ovsdb/client.py336
-rw-r--r--ryu/services/protocols/ovsdb/event.py180
-rw-r--r--ryu/services/protocols/ovsdb/manager.py149
-rw-r--r--ryu/services/protocols/ovsdb/model.py44
8 files changed, 922 insertions, 0 deletions
diff --git a/doc/source/library.rst b/doc/source/library.rst
index 38cc3872..bc8ff67f 100644
--- a/doc/source/library.rst
+++ b/doc/source/library.rst
@@ -12,3 +12,4 @@ Ryu provides some useful library for your network applications.
library_of_config.rst
library_bgp_speaker.rst
library_bgp_speaker_ref.rst
+ library_ovsdb_manager.rst
diff --git a/doc/source/library_ovsdb_manager.rst b/doc/source/library_ovsdb_manager.rst
new file mode 100644
index 00000000..b23ae81d
--- /dev/null
+++ b/doc/source/library_ovsdb_manager.rst
@@ -0,0 +1,61 @@
+*********************
+OVSDB Manager library
+*********************
+
+Introduction
+============
+
+Ryu OVSDB Manager library allows your code to interact with devices
+speaking the OVSDB protocol. This enables your code to perform remote
+management of the devices and react to topology changes on them.
+
+Example
+=======
+
+The following logs all new OVSDB connections and allows creating a port
+on a bridge.
+
+.. code-block:: python
+
+ import uuid
+
+ from ryu.base import app_manager
+ from ryu.services.protocols.ovsdb import api as ovsdb
+ from ryu.services.protocols.ovsdb import event as ovsdb_event
+
+
+ class MyApp(app_manager.RyuApp):
+ @set_ev_cls(ovsdb_event.EventNewOVSDBConnection)
+ def handle_new_ovsdb_connection(self, ev):
+ system_id = ev.system_id
+ self.logger.info('New OVSDB connection from system id %s',
+ systemd_id)
+
+ def create_port(self, systemd_id, bridge_name, name):
+ new_iface_uuid = uuid.uuid4()
+ new_port_uuid = uuid.uuid4()
+
+ def _create_port(tables, insert):
+ bridge = ovsdb.row_by_name(self, system_id, bridge_name)
+
+ iface = insert(tables['Interface'], new_iface_uuid)
+ iface.name = name
+ iface.type = 'internal'
+
+ port = insert(tables['Port'], new_port_uuid)
+ port.name = name
+ port.interfaces = [iface]
+
+ brdige.ports = bridfe.ports + [port]
+
+ return (new_port_uuid, new_iface_uuid)
+
+ req = ovsdb_event.EventModifyRequest(system_id, _create_port)
+ rep = self.send_request(req)
+
+ if rep.status != 'success':
+ self.logger.error('Error creating port %s on bridge %s: %s',
+ name, bridge, rep.status)
+ return None
+
+ return reply.insert_uuid[new_port_uuid]
diff --git a/ryu/services/protocols/ovsdb/__init__.py b/ryu/services/protocols/ovsdb/__init__.py
new file mode 100644
index 00000000..fb3d4547
--- /dev/null
+++ b/ryu/services/protocols/ovsdb/__init__.py
@@ -0,0 +1,14 @@
+# Copyright (c) 2014 Rackspace Hosting
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/ryu/services/protocols/ovsdb/api.py b/ryu/services/protocols/ovsdb/api.py
new file mode 100644
index 00000000..ea73cbf2
--- /dev/null
+++ b/ryu/services/protocols/ovsdb/api.py
@@ -0,0 +1,137 @@
+# Copyright (c) 2014 Rackspace Hosting
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from ryu.lib import dpid as dpidlib
+from ryu.services.protocols.ovsdb import event as ovsdb_event
+
+
+def match_row(manager, system_id, table, fn):
+ def _match_row(tables):
+ return next((r for r in tables[table].rows.values()
+ if fn(r)), None)
+
+ request_to_get_tables = ovsdb_event.EventReadRequest(system_id,
+ _match_row)
+ reply_to_get_tables = manager.send_request(request_to_get_tables)
+ return reply_to_get_tables.result
+
+
+def match_rows(manager, system_id, table, fn):
+ def _match_rows(tables):
+ return (r for r in tables[table].rows.values() if fn(r))
+
+ request = ovsdb_event.EventReadRequest(system_id, _match_rows)
+ reply = manager.send_request(request)
+ return reply.result
+
+
+def row_by_name(manager, system_id, name, table='Bridge', fn=None):
+ matched_row = match_row(manager, system_id, table,
+ lambda row: row.name == name)
+
+ if fn is not None:
+ return fn(matched_row)
+
+ return matched_row
+
+
+def get_column_value(manager, table, record, column):
+ """
+ Example : To get datapath_id from Bridge table
+ get_column_value('Bridge', <bridge name>, 'datapath_id').strip('"')
+ """
+ row = row_by_name(manager, record, table)
+ value = getattr(row, column, "")
+
+ if isinstance(value, list) and len(value) == 1:
+ value = value[0]
+
+ return str(value)
+
+
+def get_iface_by_name(manager, system_id, name, fn=None):
+ iface = row_by_name(manager, system_id, name, 'Interface')
+
+ if fn is not None:
+ return fn(iface)
+
+ return iface
+
+
+def get_bridge_for_iface_name(manager, system_id, iface_name, fn=None):
+ iface = row_by_name(manager, system_id, iface_name, 'Interface')
+ port = match_row(manager, system_id, 'Port',
+ lambda x: iface in x.interfaces)
+ bridge = match_row(manager, system_id, 'Bridge',
+ lambda x: port in x.ports)
+
+ if fn is not None:
+ return fn(bridge)
+
+ return bridge
+
+
+def get_table(manager, system_id, name):
+ def _get_table(tables):
+ return tables[name]
+
+ request_to_get_tables = ovsdb_event.EventReadRequest(system_id,
+ _get_table)
+ reply_to_get_tables = manager.send_request(request_to_get_tables)
+ return reply_to_get_tables.result
+
+
+def get_bridge_by_datapath_id(manager, system_id, datapath_id, fn=None):
+ def _match_fn(row):
+ row_dpid = dpidlib.str_to_dpid(str(row.datapath_id[0]))
+ return row_dpid == datapath_id
+
+ bridge = match_row(manager, system_id, 'Bridge', _match_fn)
+
+ if fn is not None:
+ return fn(bridge)
+
+ return bridge
+
+
+def get_datapath_ids_for_systemd_id(manager, system_id):
+ def _get_dp_ids(tables):
+ dp_ids = []
+
+ bridges = tables.get('Bridge')
+
+ if not bridges:
+ return dp_ids
+
+ for bridge in bridges.rows.values():
+ datapath_ids = bridge.datapath_id
+ dp_ids.extend(dpidlib.str_to_dpid(dp_id) for dp_id in datapath_ids)
+
+ return dp_ids
+
+ request = ovsdb_event.EventReadRequest(system_id, _get_dp_ids)
+ reply = manager.send_request(request)
+ return reply.result
+
+
+def get_bridges_by_system_id(manager, system_id):
+ return get_table(manager, system_id, 'Bridge').rows.values()
+
+
+def bridge_exists(manager, system_id, bridge_name):
+ return bool(row_by_name(manager, system_id, bridge_name))
+
+
+def port_exists(manager, system_id, port_name):
+ return bool(row_by_name(manager, system_id, port_name, 'Port'))
diff --git a/ryu/services/protocols/ovsdb/client.py b/ryu/services/protocols/ovsdb/client.py
new file mode 100644
index 00000000..175936ca
--- /dev/null
+++ b/ryu/services/protocols/ovsdb/client.py
@@ -0,0 +1,336 @@
+# Copyright (c) 2014 Rackspace Hosting
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import collections
+import logging
+import uuid
+
+# NOTE(jkoelker) Patch Vlog so that is uses standard logging
+from ovs import vlog
+
+
+class Vlog(vlog.Vlog):
+ def __init__(self, name):
+ self.log = logging.getLogger('ovs.%s' % name)
+
+ def __log(self, level, message, **kwargs):
+ level = vlog.LEVELS.get(level, logging.DEBUG)
+ self.log.log(level, message, **kwargs)
+
+vlog.Vlog = Vlog
+
+
+from ovs import jsonrpc
+from ovs import reconnect
+from ovs import stream
+from ovs import timeval
+from ovs.db import idl
+
+from ryu.base import app_manager
+from ryu.lib import hub
+from ryu.services.protocols.ovsdb import event
+from ryu.services.protocols.ovsdb import model
+
+
+now = timeval.msec
+
+
+def _uuid_to_row(atom, base):
+ if base.ref_table:
+ value = base.ref_table.rows.get(atom)
+ else:
+ value = atom
+
+ if isinstance(value, idl.Row):
+ value = str(value.uuid)
+
+ return value
+
+
+def dictify(row):
+ if row is None:
+ return
+
+ return dict([(k, v.to_python(_uuid_to_row))
+ for k, v in row._data.items()])
+
+
+def discover_schemas(connection):
+ # NOTE(jkoelker) currently only the Open_vSwitch schema
+ # is supported.
+ # TODO(jkoelker) support arbitrary schemas
+ req = jsonrpc.Message.create_request('list_dbs', [])
+ error, reply = connection.transact_block(req)
+
+ if error or reply.error:
+ return
+
+ schemas = []
+ for db in reply.result:
+ if db != 'Open_vSwitch':
+ continue
+
+ req = jsonrpc.Message.create_request('get_schema', [db])
+ error, reply = connection.transact_block(req)
+
+ if error or reply.error:
+ # TODO(jkoelker) Error handling
+ continue
+
+ schemas.append(reply.result)
+
+ return schemas
+
+
+def discover_system_id(idl):
+ system_id = None
+
+ while system_id is None:
+ idl.run()
+ openvswitch = idl.tables['Open_vSwitch'].rows
+
+ if openvswitch:
+ row = openvswitch.get(list(openvswitch.keys())[0])
+ system_id = row.external_ids.get('system-id')
+
+ return system_id
+
+
+# NOTE(jkoelker) Wrap ovs's Idl to accept an existing session, and
+# trigger callbacks on changes
+class Idl(idl.Idl):
+ def __init__(self, session, schema):
+ if not isinstance(schema, idl.SchemaHelper):
+ schema = idl.SchemaHelper(schema_json=schema)
+ schema.register_all()
+
+ schema = schema.get_idl_schema()
+
+ # NOTE(jkoelker) event buffer
+ self._events = []
+
+ self.tables = schema.tables
+ self._db = schema
+ self._session = session
+ self._monitor_request_id = None
+ self._last_seqno = None
+ self.change_seqno = 0
+
+ # Database locking.
+ self.lock_name = None # Name of lock we need, None if none.
+ self.has_lock = False # Has db server said we have the lock?
+ self.is_lock_contended = False # Has db server said we can't get lock?
+ self._lock_request_id = None # JSON-RPC ID of in-flight lock request.
+
+ # Transaction support.
+ self.txn = None
+ self._outstanding_txns = {}
+
+ for table in schema.tables.values():
+ for column in table.columns.values():
+ if not hasattr(column, 'alert'):
+ column.alert = True
+ table.need_table = False
+ table.rows = {}
+ table.idl = self
+
+ @property
+ def events(self):
+ events = self._events
+ self._events = []
+ return events
+
+ def __process_update(self, table, uuid, old, new):
+ old_row = table.rows.get(uuid)
+ if old_row is not None:
+ old_row = model.Row(dictify(old_row))
+ old_row['_uuid'] = uuid
+
+ changed = idl.Idl.__process_update(self, table, uuid, old, new)
+
+ if changed:
+ if not new:
+ ev = (event.EventRowDelete, (table.name, old_row))
+
+ elif not old:
+ new_row = model.Row(dictify(table.rows.get(uuid)))
+ new_row['_uuid'] = uuid
+ ev = (event.EventRowInsert, (table.name, new_row))
+
+ else:
+ new_row = model.Row(dictify(table.rows.get(uuid)))
+ new_row['_uuid'] = uuid
+
+ ev = (event.EventRowUpdate, (table.name, old_row, new_row))
+
+ self._events.append(ev)
+
+ return changed
+
+
+class RemoteOvsdb(app_manager.RyuApp):
+ _EVENTS = [event.EventRowUpdate,
+ event.EventRowDelete,
+ event.EventRowInsert,
+ event.EventInterfaceDeleted,
+ event.EventInterfaceInserted,
+ event.EventInterfaceUpdated,
+ event.EventPortDeleted,
+ event.EventPortInserted,
+ event.EventPortUpdated]
+
+ @classmethod
+ def factory(cls, sock, address, *args, **kwargs):
+ ovs_stream = stream.Stream(sock, None, None)
+ connection = jsonrpc.Connection(ovs_stream)
+ schemas = discover_schemas(connection)
+
+ if not schemas:
+ return
+
+ fsm = reconnect.Reconnect(now())
+ fsm.set_name('%s:%s' % address)
+ fsm.enable(now())
+ fsm.set_passive(True, now())
+ fsm.set_max_tries(-1)
+ fsm.connected(now())
+
+ session = jsonrpc.Session(fsm, connection)
+ idl = Idl(session, schemas[0])
+
+ system_id = discover_system_id(idl)
+ name = cls.instance_name(system_id)
+ ovs_stream.name = name
+ connection.name = name
+ fsm.set_name(name)
+
+ kwargs = kwargs.copy()
+ kwargs['address'] = address
+ kwargs['idl'] = idl
+ kwargs['name'] = name
+ kwargs['system_id'] = system_id
+
+ app_mgr = app_manager.AppManager.get_instance()
+ return app_mgr.instantiate(cls, *args, **kwargs)
+
+ @classmethod
+ def instance_name(cls, system_id):
+ return '%s-%s' % (cls.__name__, system_id)
+
+ def __init__(self, *args, **kwargs):
+ super(RemoteOvsdb, self).__init__(*args, **kwargs)
+ self.address = kwargs['address']
+ self._idl = kwargs['idl']
+ self.system_id = kwargs['system_id']
+ self.name = kwargs['name']
+ self._txn_q = collections.deque()
+
+ def _event_proxy_loop(self):
+ while self.is_active:
+ events = self._idl.events
+
+ if not events:
+ hub.sleep(0.1)
+ continue
+
+ for event in events:
+ ev = event[0]
+ args = event[1]
+ self._submit_event(ev(self.system_id, *args))
+
+ hub.sleep(0)
+
+ def _submit_event(self, ev):
+ self.send_event_to_observers(ev)
+ try:
+ ev_cls_name = 'Event' + ev.table + ev.event_type
+ proxy_ev_cls = getattr(event, ev_cls_name, None)
+ if proxy_ev_cls:
+ self.send_event_to_observers(proxy_ev_cls(ev))
+ except Exception:
+ self.logger.exception('Error submitting specific event for OVSDB',
+ self.system_id)
+
+ def _idl_loop(self):
+ while self.is_active:
+ try:
+ self._idl.run()
+ self._transactions()
+ except Exception:
+ self.logger.exception('Error running IDL for system_id %s' %
+ self.system_id)
+ break
+
+ hub.sleep(0)
+
+ def _run_thread(self, func, *args, **kwargs):
+ try:
+ func(*args, **kwargs)
+
+ finally:
+ self.stop()
+
+ def _transactions(self):
+ if not self._txn_q:
+ return
+
+ # NOTE(jkoelker) possibly run multiple transactions per loop?
+ self._transaction()
+
+ def _transaction(self):
+ req = self._txn_q.popleft()
+ txn = idl.Transaction(self._idl)
+
+ uuids = req.func(self._idl.tables, txn.insert)
+ status = txn.commit_block()
+
+ insert_uuids = {}
+ err_msg = None
+
+ if status in (idl.Transaction.SUCCESS,
+ idl.Transaction.UNCHANGED):
+ if uuids:
+ if isinstance(uuids, uuid.UUID):
+ insert_uuids[uuids] = txn.get_insert_uuid(uuids)
+
+ else:
+ insert_uuids = dict((uuid, txn.get_insert_uuid(uuid))
+ for uuid in uuids)
+ else:
+ err_msg = txn.get_error()
+
+ rep = event.EventModifyReply(self.system_id, status, insert_uuids,
+ err_msg)
+ self.reply_to_request(req, rep)
+
+ def modify_request_handler(self, ev):
+ self._txn_q.append(ev)
+
+ def read_request_handler(self, ev):
+ result = ev.func(self._idl.tables)
+ rep = event.EventReadReply(self.system_id, result)
+ self.reply_to_request(ev, rep)
+
+ def start(self):
+ super(RemoteOvsdb, self).start()
+ t = hub.spawn(self._run_thread, self._idl_loop)
+ self.threads.append(t)
+
+ t = hub.spawn(self._run_thread, self._event_proxy_loop)
+ self.threads.append(t)
+
+ def stop(self):
+ super(RemoteOvsdb, self).stop()
+ self._idl.close()
diff --git a/ryu/services/protocols/ovsdb/event.py b/ryu/services/protocols/ovsdb/event.py
new file mode 100644
index 00000000..2353a4ff
--- /dev/null
+++ b/ryu/services/protocols/ovsdb/event.py
@@ -0,0 +1,180 @@
+# Copyright (c) 2014 Rackspace Hosting
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ryu.controller import event as ryu_event
+from ryu.controller import handler
+
+
+class EventRowBase(ryu_event.EventBase):
+ def __init__(self, system_id, table, row, event_type):
+ super(EventRowBase, self).__init__()
+ self.system_id = system_id
+ self.table = table
+ self.row = row
+ self.event_type = event_type
+
+ def __str__(self):
+ return '%s<system_id=%s table=%s, uuid=%s>' % (self.__class__.__name__,
+ self.system_id,
+ self.table,
+ self.row['_uuid'])
+
+
+class EventRowDelete(EventRowBase):
+ def __init__(self, system_id, table, row):
+ super(EventRowDelete, self).__init__(system_id, table, row, 'Deleted')
+
+
+class EventRowInsert(EventRowBase):
+ def __init__(self, system_id, table, row):
+ super(EventRowInsert, self).__init__(system_id, table, row, 'Inserted')
+
+
+class EventRowUpdate(ryu_event.EventBase):
+ def __init__(self, system_id, table, old, new):
+ super(EventRowUpdate, self).__init__()
+ self.system_id = system_id
+ self.table = table
+ self.old = old
+ self.new = new
+ self.event_type = 'Updated'
+
+ def __str__(self):
+ return '%s<system_id=%s table=%s, uuid=%s>' % (self.__class__.__name__,
+ self.system_id,
+ self.table,
+ self.old['_uuid'])
+
+
+class EventModifyRequest(ryu_event.EventRequestBase):
+ """ Dispatch a modify function to OVSDB
+
+ `func` must be a callable that accepts an insert fucntion and the
+ IDL.tables object. It can then modify the tables as needed. For inserts,
+ specify a UUID for each insert, and return a tuple of the temporary
+ UUID's. The execution of `func` will be wrapped in a single transaction
+ and the reply will include a dict of temporary UUID to real UUID mappings.
+
+ e.g.
+
+ new_port_uuid = uuid.uuid4()
+
+ def modify(tables, insert):
+ bridges = tables['Bridge'].rows
+ bridge = None
+ for b in bridges:
+ if b.name == 'my-bridge':
+ bridge = b
+
+ if not bridge:
+ return
+
+ port = insert('Port', new_port_uuid)
+
+ bridge.ports = bridge.ports + [port]
+
+ return (new_port_uuid, )
+
+ request = EventModifyRequest(system_id, modify)
+ reply = send_request(request)
+
+ port_uuid = reply.insert_uuids[new_port_uuid]
+ """
+ def __init__(self, system_id, func):
+ super(EventModifyRequest, self).__init__()
+ self.dst = 'OVSDB'
+ self.system_id = system_id
+ self.func = func
+
+
+class EventModifyReply(ryu_event.EventReplyBase):
+ def __init__(self, system_id, status, insert_uuids, err_msg):
+ self.system_id = system_id
+ self.status = status
+ self.insert_uuids = insert_uuids
+ self.err_msg = err_msg
+
+
+class EventNewOVSDBConnection(ryu_event.EventBase):
+ def __init__(self, system_id):
+ super(EventNewOVSDBConnection, self).__init__()
+ self.system_id = system_id
+
+ def __str__(self):
+ return '%s<system_id=%s>' % (self.__class__.__name__,
+ self.system_id)
+
+
+class EventReadRequest(ryu_event.EventRequestBase):
+ def __init__(self, system_id, func):
+ self.system_id = system_id
+ self.func = func
+ self.dst = 'OVSDB'
+
+
+class EventReadReply(ryu_event.EventReplyBase):
+ def __init__(self, system_id, result, err_msg=''):
+ self.system_id = system_id
+ self.result = result
+ self.err_msg = err_msg
+
+
+class EventRowInsertedBase(EventRowInsert):
+ def __init__(self, ev):
+ super(EventRowInsertedBase, self).__init__(ev.system_id,
+ ev.table,
+ ev.row)
+
+
+class EventRowDeletedBase(EventRowDelete):
+ def __init__(self, ev):
+ super(EventRowDeletedBase, self).__init__(ev.system_id,
+ ev.table,
+ ev.row)
+
+
+class EventRowUpdatedBase(EventRowUpdate):
+ def __init__(self, ev):
+ super(EventRowUpdatedBase, self).__init__(ev.system_id,
+ ev.table,
+ ev.old,
+ ev.new)
+
+
+class EventPortInserted(EventRowInsertedBase):
+ pass
+
+
+class EventPortDeleted(EventRowDeletedBase):
+ pass
+
+
+class EventPortUpdated(EventRowUpdatedBase):
+ pass
+
+
+class EventInterfaceInserted(EventRowInsertedBase):
+ pass
+
+
+class EventInterfaceDeleted(EventRowDeletedBase):
+ pass
+
+
+class EventInterfaceUpdated(EventRowUpdatedBase):
+ pass
+
+
+handler.register_service('ryu.services.protocols.ovsdb.manager')
diff --git a/ryu/services/protocols/ovsdb/manager.py b/ryu/services/protocols/ovsdb/manager.py
new file mode 100644
index 00000000..b34fb7d3
--- /dev/null
+++ b/ryu/services/protocols/ovsdb/manager.py
@@ -0,0 +1,149 @@
+# Copyright (c) 2014 Rackspace Hosting
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import ssl
+import socket
+
+from ryu import cfg
+from ryu.base import app_manager
+from ryu.lib import hub
+from ryu.services.protocols.ovsdb import client
+from ryu.services.protocols.ovsdb import event
+from ryu.controller import handler
+
+
+opts = (cfg.StrOpt('address', default='0.0.0.0', help='OVSDB address'),
+ cfg.IntOpt('port', default=6640, help='OVSDB port'),
+ cfg.StrOpt('mngr-privkey', default=None, help='manager private key'),
+ cfg.StrOpt('mngr-cert', default=None, help='manager certificate'),
+ cfg.ListOpt('whitelist', default=[],
+ help='Whitelist of address to allow to connect'))
+
+cfg.CONF.register_opts(opts, 'ovsdb')
+
+
+class OVSDB(app_manager.RyuApp):
+ _EVENTS = [event.EventNewOVSDBConnection,
+ event.EventModifyRequest,
+ event.EventReadRequest]
+
+ def __init__(self, *args, **kwargs):
+ super(OVSDB, self).__init__(*args, **kwargs)
+ self._address = self.CONF.ovsdb.address
+ self._port = self.CONF.ovsdb.port
+ self._clients = {}
+
+ def _accept(self, server):
+ if self.CONF.ovsdb.whitelist:
+ def check(address):
+ if address in self.CONF.ovsdb.whitelist:
+ return True
+
+ self.logger.debug('Connection from non-whitelist client '
+ '(%s:%s)' % address)
+ return False
+
+ else:
+ def check(address):
+ return True
+
+ while True:
+ # TODO(jkoelker) SSL Certificate Fingerprint check
+ sock, client_address = server.accept()
+
+ if not check(client_address[0]):
+ sock.shutdown(socket.SHUT_RDWR)
+ sock.close()
+ continue
+
+ self.logger.debug('New connection from %s:%s' % client_address)
+ t = hub.spawn(self._start_remote, sock, client_address)
+ self.threads.append(t)
+
+ def _proxy_event(self, ev):
+ system_id = ev.system_id
+ client_name = client.RemoteOvsdb.instance_name(system_id)
+
+ if client_name not in self._clients:
+ self.logger.info('Unknown remote system_id %s' % system_id)
+ return
+
+ return self.send_event(client_name, ev)
+
+ def _start_remote(self, sock, client_address):
+ app = client.RemoteOvsdb.factory(sock, client_address)
+
+ if app:
+ self._clients[app.name] = app
+ app.start()
+ ev = event.EventNewOVSDBConnection(app.system_id)
+ self.send_event_to_observers(ev)
+
+ def start(self):
+ server = hub.listen((self._address, self._port))
+ key = self.CONF.ovsdb.mngr_privkey or self.CONF.ctl_privkey
+ cert = self.CONF.ovsdb.mngr_cert or self.CONF.ctl_cert
+
+ if key is not None and cert is not None:
+ ssl_kwargs = dict(keyfile=key, certfile=cert, server_side=True)
+
+ if self.CONF.ca_certs is not None:
+ ssl_kwargs['cert_reqs'] = ssl.CERT_REQUIRED
+ ssl_kwargs['ca_certs'] = self.CONF.ca_certs
+
+ server = ssl.wrap_socket(server, **ssl_kwargs)
+
+ self._server = server
+
+ self.logger.info('Listening on %s:%s for clients' % (self._address,
+ self._port))
+ t = hub.spawn(self._accept, self._server)
+ super(OVSDB, self).start()
+ return t
+
+ def stop(self):
+ for client in self._clients.values():
+ client.stop()
+
+ super(OVSDB, self).stop()
+
+ @handler.set_ev_cls(event.EventModifyRequest)
+ def modify_request_handler(self, ev):
+
+ system_id = ev.system_id
+ client_name = client.RemoteOvsdb.instance_name(system_id)
+ remote = self._clients.get(client_name)
+
+ if not remote:
+ msg = 'Unknown remote system_id %s' % system_id
+ self.logger.info(msg)
+ rep = event.EventModifyReply(system_id, None, None, msg)
+ return self.reply_to_request(ev, rep)
+
+ return remote.modify_request_handler(ev)
+
+ @handler.set_ev_cls(event.EventReadRequest)
+ def read_request_handler(self, ev):
+ system_id = ev.system_id
+ client_name = client.RemoteOvsdb.instance_name(system_id)
+ remote = self._clients.get(client_name)
+
+ if not remote:
+ msg = 'Unknown remote system_id %s' % system_id
+ self.logger.info(msg)
+ rep = event.EventReadReply(self.system_id, None, msg)
+ return self.reply_to_request(ev, rep)
+
+ return remote.read_request_handler(ev)
diff --git a/ryu/services/protocols/ovsdb/model.py b/ryu/services/protocols/ovsdb/model.py
new file mode 100644
index 00000000..992c785f
--- /dev/null
+++ b/ryu/services/protocols/ovsdb/model.py
@@ -0,0 +1,44 @@
+# Copyright (c) 2014 Rackspace Hosting
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import uuid
+
+
+class _UUIDDict(dict):
+ def _uuidize(self):
+ if '_uuid' not in self or self['_uuid'] is None:
+ self['_uuid'] = uuid.uuid4()
+
+ @property
+ def uuid(self):
+ self._uuidize()
+ return self['_uuid']
+
+ @uuid.setter
+ def uuid(self, value):
+ self['_uuid'] = value
+
+
+class Row(_UUIDDict):
+ @property
+ def delete(self):
+ if '_delete' in self and self['_delete']:
+ return True
+
+ return False
+
+ @delete.setter
+ def delete(self, value):
+ self['_delete'] = value