summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--ryu/lib/ovs/vsctl.py1341
1 files changed, 1341 insertions, 0 deletions
diff --git a/ryu/lib/ovs/vsctl.py b/ryu/lib/ovs/vsctl.py
new file mode 100644
index 00000000..19bc067e
--- /dev/null
+++ b/ryu/lib/ovs/vsctl.py
@@ -0,0 +1,1341 @@
+# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
+# Copyright (C) 2012 Isaku Yamahata <yamahata at private email ne jp>
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import gevent
+import itertools
+import logging
+import os
+import weakref
+
+import ovs.db.data
+import ovs.db.types
+import ovs.poller
+from ovs import (jsonrpc,
+ ovsuuid,
+ stream)
+from ovs.db import idl
+
+from ryu.lib.ovs import vswitch_idl
+
+LOG = logging.getLogger(__name__) # use ovs.vlog?
+
+
+# for debug
+def ovsrec_row_changes_to_string(ovsrec_row):
+ if not ovsrec_row._changes:
+ return ovsrec_row._changes
+
+ return dict((key, value.to_string())
+ for key, value in ovsrec_row._changes.items())
+
+
+# for debug
+def ovsrec_row_to_string(ovsrec_row):
+ output = ''
+ output += 'uuid: %s ' % ovsrec_row.uuid
+ if ovsrec_row._data:
+ output += '_data: %s ' % dict((key, value.to_string()) for key, value
+ in ovsrec_row._data.items())
+ else:
+ output += '_data: %s ' % ovsrec_row._data
+ output += '_changes: %s' % ovsrec_row_changes_to_string(ovsrec_row)
+ return output
+
+
+def atom_from_string(base, value_string, symtab=None):
+ type_ = base.type
+ atom = None
+ if type_ == ovs.db.types.IntegerType:
+ atom = ovs.db.data.Atom(type_, int(value_string))
+ elif type_ == ovs.db.types.RealType:
+ # TODO:XXX negation
+ atom = ovs.db.data.Atom(
+ type_, ovs.db.parser.float_to_int(float(value_string)))
+ elif type_ == ovs.db.types.BooleanType:
+ if value_string in ("true", "yes", "on", "1"):
+ atom = ovs.db.data.Atom(type_, True)
+ elif value_string == ("false", "no", "off", "0"):
+ atom = ovs.db.data.Atom(type_, False)
+ elif type_ == ovs.db.types.StringType:
+ # TODO:XXXX escape: if value_string[0] == '"':
+ atom = ovs.db.data.Atom(type_, value_string)
+ elif type_ == ovs.db.types.UuidType:
+ if value_string[0] == "@":
+ assert symtab is not None
+ uuid_ = symtab[value_string]
+ atom = ovs.db.data.Atom(type_, uuid_)
+ else:
+ atom = ovs.db.data.Atom(type_,
+ ovs.ovsuuid.from_string(value_string))
+ if atom is None:
+ raise ValueError("expected %s" % type_.to_string(), value_string)
+ atom.check_constraints(base)
+ return atom
+
+
+def datum_from_string(type_, value_string, symtab=None):
+ value_string = value_string.strip()
+ if type_.is_map():
+ if value_string.startswith('{'):
+ # TODO:dict case
+ LOG.debug('value_string %s', value_string)
+ raise NotImplementedError()
+ d = dict(v.split('=', 1) for v in value_string.split(','))
+ d = dict((atom_from_string(type_.key, key, symtab),
+ atom_from_string(type_.value, value, symtab))
+ for key, value in d.items())
+ elif type_.is_set():
+ if value_string.startswith('['):
+ # TODO:set case
+ LOG.debug('value_string %s', value_string)
+ raise NotImplementedError()
+ values = value_string.split(',')
+ d = dict((atom_from_string(type_.key, value, symtab), None)
+ for value in values)
+ else:
+ atom = atom_from_string(type_.key, value_string, symtab)
+ d = {atom: None}
+
+ datum = ovs.db.data.Datum(type_, d)
+ return datum.to_json()
+
+
+def ifind(pred, seq):
+ try:
+ return itertools.ifilter(pred, seq).next()
+ except StopIteration:
+ return None
+
+
+def not_reached():
+ os.abort()
+
+
+def vsctl_fatal(msg):
+ LOG.error(msg)
+ raise Exception(msg) # not call ovs.utils.ovs_fatal for reusability
+
+
+class VSCtlBridge(object):
+ def __init__(self, ovsrec_bridge, name, parent, vlan):
+ super(VSCtlBridge, self).__init__()
+ self.br_cfg = ovsrec_bridge
+ self.name = name
+ self.ports = set()
+
+ self.parent = parent
+ self.vlan = vlan
+ self.children = set() # WeakSet is needed?
+
+ def find_vlan_bridge(self, vlan):
+ return ifind(lambda child: child.vlan == vlan, self.children)
+
+
+class VSCtlPort(object):
+ def __init__(self, vsctl_bridge_parent, ovsrec_port):
+ super(VSCtlPort, self).__init__()
+ self.bridge = weakref.ref(vsctl_bridge_parent) # backpointer
+ self.port_cfg = ovsrec_port
+
+ self.ifaces = set()
+
+
+class VSCtlIface(object):
+ def __init__(self, vsctl_port_parent, ovsrec_iface):
+ super(VSCtlIface, self).__init__()
+ self.port = weakref.ref(vsctl_port_parent) # backpointer
+ self.iface_cfg = ovsrec_iface
+
+
+class VSCtlContext(object):
+ def _invalidate_cache(self):
+ self.cache_valid = False
+ self.bridges.clear()
+ self.ports.clear()
+ self.ifaces.clear()
+
+ def __init__(self, idl_, txn, ovsrec_open_vswitch):
+ super(VSCtlContext, self).__init__()
+
+ # Modifiable state
+ # self.table = None
+ self.idl = idl_
+ self.txn = txn
+ self.ovs = ovsrec_open_vswitch
+ self.symtab = None # TODO:XXX
+ self.verified_ports = False
+
+ # A cache of the contents of the database.
+ self.cache_valid = False
+ self.bridges = {} # bridge name -> VSCtlBridge
+ self.ports = {} # port name -> VSCtlPort
+ self.ifaces = {} # iface name -> VSCtlIface
+
+ self.try_again = False # used by wait-until command
+
+ def done(self):
+ self._invalidate_cache()
+
+ def verify_bridges(self):
+ self.ovs.verify(vswitch_idl.OVSREC_OPEN_VSWITCH_COL_BRIDGES)
+
+ def verify_ports(self):
+ if self.verified_ports:
+ return
+
+ self.verify_bridges()
+ for ovsrec_bridge in self.idl.tables[
+ vswitch_idl.OVSREC_TABLE_BRIDGE].rows.values():
+ ovsrec_bridge.verify(vswitch_idl.OVSREC_BRIDGE_COL_PORTS)
+ for ovsrec_port in self.idl.tables[
+ vswitch_idl.OVSREC_TABLE_PORT].rows.values():
+ ovsrec_port.verify(vswitch_idl.OVSREC_PORT_COL_INTERFACES)
+ self.verified_ports = True
+
+ def add_bridge_to_cache(self, ovsrec_bridge, name, parent, vlan):
+ vsctl_bridge = VSCtlBridge(ovsrec_bridge, name, parent, vlan)
+ if parent:
+ parent.children.add(vsctl_bridge)
+ self.bridges[name] = vsctl_bridge
+ return vsctl_bridge
+
+ def add_port_to_cache(self, vsctl_bridge_parent, ovsrec_port):
+ tag = getattr(ovsrec_port, vswitch_idl.OVSREC_PORT_COL_TAG, None)
+ if (tag is not None and tag >= 0 and tag < 4096):
+ vlan_bridge = vsctl_bridge_parent.find_vlan_bridge()
+ if vlan_bridge:
+ vsctl_bridge_parent = vlan_bridge
+
+ vsctl_port = VSCtlPort(vsctl_bridge_parent, ovsrec_port)
+ vsctl_bridge_parent.ports.add(vsctl_port)
+ self.ports[ovsrec_port.name] = vsctl_port
+ return vsctl_port
+
+ def del_cached_port(self, vsctl_port):
+ assert not vsctl_port.ifaces
+ vsctl_port.bridge().ports.remove(vsctl_port)
+ vsctl_port.bridge = None
+ port = self.ports.pop(vsctl_port.port_cfg.name)
+ assert port == vsctl_port
+ vsctl_port.port_cfg.delete()
+
+ def add_iface_to_cache(self, vsctl_port_parent, ovsrec_iface):
+ vsctl_iface = VSCtlIface(vsctl_port_parent, ovsrec_iface)
+ vsctl_port_parent.ifaces.add(vsctl_iface)
+ self.ifaces[ovsrec_iface.name] = vsctl_iface
+
+ def del_cached_iface(self, vsctl_iface):
+ vsctl_iface.port().ifaces.remove(vsctl_iface)
+ vsctl_iface.port = None
+ del self.ifaces[vsctl_iface.iface_cfg.name]
+ vsctl_iface.iface_cfg.delete()
+
+ def invalidate_cache(self):
+ if not self.cache_valid:
+ return
+ self._invalidate_cache()
+
+ def populate_cache(self):
+ self._populate_cache(self.idl.tables[vswitch_idl.OVSREC_TABLE_BRIDGE])
+
+ @staticmethod
+ def port_is_fake_bridge(ovsrec_port):
+ return (ovsrec_port.fake_bridge and
+ ovsrec_port.tag >= 0 and ovsrec_port.tab <= 4095)
+
+ def _populate_cache(self, ovsrec_bridges):
+ if self.cache_valid:
+ return
+ self.cache_valid = True
+
+ bridges = set()
+ ports = set()
+ for ovsrec_bridge in ovsrec_bridges.rows.values():
+ name = ovsrec_bridge.name
+ if name in bridges:
+ LOG.warn('%s: database contains duplicate bridge name', name)
+ bridges.add(name)
+ vsctl_bridge = self.add_bridge_to_cache(ovsrec_bridge, name,
+ None, 0)
+ if not vsctl_bridge:
+ continue
+ for ovsrec_port in ovsrec_bridge.ports:
+ port_name = ovsrec_port.name
+ if port_name in ports:
+ # Duplicate ovsrec_port name.
+ # (We will warn about that later.)
+ continue
+ ports.add(port_name)
+ if (self.port_is_fake_bridge(ovsrec_port) and
+ port_name not in bridges):
+ bridges.add(port_name)
+ self.add_bridge_to_cache(None, port_name, vsctl_bridge,
+ ovsrec_port.tag)
+
+ bridges = set()
+ for ovsrec_bridge in ovsrec_bridges.rows.values():
+ name = ovsrec_bridge.name
+ if name in bridges:
+ continue
+ bridges.add(name)
+ vsctl_bridge = self.bridges[name]
+ for ovsrec_port in ovsrec_bridge.ports:
+ port_name = ovsrec_port.name
+ vsctl_port = self.ports.get(port_name)
+ if vsctl_port:
+ if ovsrec_port == vsctl_port.port_cfg:
+ LOG.warn('%s: vsctl_port is in multiple bridges '
+ '(%s and %s)',
+ port_name, vsctl_bridge.name,
+ vsctl_port.br.name)
+ else:
+ LOG.error('%s: database contains duplicate '
+ 'vsctl_port name',
+ ovsrec_port.name)
+ continue
+
+ if (self.port_is_fake_bridge(ovsrec_port) and
+ port_name in bridges):
+ continue
+
+ # LOG.debug('ovsrec_port %s %s %s',
+ # ovsrec_port, ovsrec_port._data, ovsrec_port.tag)
+ vsctl_port = self.add_port_to_cache(vsctl_bridge, ovsrec_port)
+ # LOG.debug('vsctl_port %s', vsctl_port)
+ for ovsrec_iface in ovsrec_port.interfaces:
+ iface = self.ifaces.get(ovsrec_iface.name)
+ if iface:
+ if ovsrec_iface == iface.iface_cfg:
+ LOG.warn(
+ '%s: interface is in multiple ports '
+ '(%s and %s)',
+ ovsrec_iface.name,
+ iface.port().port_cfg.name,
+ vsctl_port.port_cfg.name)
+ else:
+ LOG.error(
+ '%s: database contains duplicate interface '
+ 'name',
+ ovsrec_iface.name)
+ continue
+ self.add_iface_to_cache(vsctl_port, ovsrec_iface)
+
+ def check_conflicts(self, name, msg):
+ self.verify_ports()
+ if name in self.bridges:
+ vsctl_fatal('%s because a bridge named %s already exists' %
+ (msg, name))
+ if name in self.ports:
+ vsctl_fatal('%s because a port named %s already exists on '
+ 'bridge %s' %
+ (msg, name, self.ports[name].bridge().name))
+ if name in self.ifaces:
+ vsctl_fatal('%s because an interface named %s already '
+ 'exists on bridge %s' %
+ (msg, name, self.ifaces[name].port().bridge().name))
+
+ def find_bridge(self, name, must_exist):
+ assert self.cache_valid
+ vsctl_bridge = self.bridges.get(name)
+ if must_exist and not vsctl_bridge:
+ vsctl_fatal('no bridge named %s' % name)
+ self.verify_bridges()
+ return vsctl_bridge
+
+ def find_real_bridge(self, name, must_exist):
+ vsctl_bridge = self.find_bridge(name, must_exist)
+ if vsctl_bridge and vsctl_bridge.parent:
+ vsctl_fatal('%s is a fake bridge' % name)
+ return vsctl_bridge
+
+ def find_bridge_by_id(self, datapath_id, must_exist):
+ assert self.cache_valid
+ for vsctl_bridge in self.bridges.values():
+ if vsctl_bridge.br_cfg.datapath_id[0].strip('"') == datapath_id:
+ self.verify_bridges()
+ return vsctl_bridge
+
+ if must_exist:
+ vsctl_fatal('no bridge id %s' % datapath_id)
+ return None
+
+ def find_port(self, name, must_exist):
+ assert self.cache_valid
+ vsctl_port = self.ports.get(name)
+ if vsctl_port and name == vsctl_port.bridge().name:
+ vsctl_port = None
+ if must_exist and not vsctl_port:
+ vsctl_fatal('no vsctl_port named %s' % name)
+ return vsctl_port
+
+ def find_iface(self, name, must_exist):
+ assert self.cache_valid
+ vsctl_iface = self.ifaces.get(name)
+ if vsctl_iface and name == vsctl_iface.port().bridge().name:
+ vsctl_iface = None
+ if must_exist and not vsctl_iface:
+ vsctl_fatal('no interface named %s' % name)
+ self.verify_ports()
+ return vsctl_iface
+
+ @staticmethod
+ def _column_set(ovsrec_row, column, ovsrec_value):
+ # need to trigger Row.__setattr__()
+ setattr(ovsrec_row, column, ovsrec_value)
+
+ @staticmethod
+ def _column_insert(ovsrec_row, column, ovsrec_add):
+ value = getattr(ovsrec_row, column)
+ value.append(ovsrec_add)
+ VSCtlContext._column_set(ovsrec_row, column, value)
+
+ @staticmethod
+ def _column_delete(ovsrec_row, column, ovsrec_del):
+ value = getattr(ovsrec_row, column)
+ try:
+ value.remove(ovsrec_del)
+ except ValueError:
+ # Datum.to_python() with _uuid_to_row trims down deleted
+ # references. If ovsrec_del.delete() is called before
+ # _column_delete(), value doesn't include ovsrec_del.
+ pass
+
+ VSCtlContext._column_set(ovsrec_row, column, value)
+
+ @staticmethod
+ def bridge_insert_port(ovsrec_bridge, ovsrec_port):
+ VSCtlContext._column_insert(ovsrec_bridge,
+ vswitch_idl.OVSREC_BRIDGE_COL_PORTS,
+ ovsrec_port)
+
+ @staticmethod
+ def bridge_delete_port(ovsrec_bridge, ovsrec_port):
+ VSCtlContext._column_delete(ovsrec_bridge,
+ vswitch_idl.OVSREC_BRIDGE_COL_PORTS,
+ ovsrec_port)
+
+ def del_port(self, vsctl_port):
+ if vsctl_port.bridge().parent:
+ ovsrec_bridge = vsctl_port.bridge().parent.br_cfg
+ else:
+ ovsrec_bridge = vsctl_port.bridge().br_cfg
+ self.bridge_delete_port(ovsrec_bridge, vsctl_port.port_cfg)
+
+ for vsctl_iface in vsctl_port.ifaces.copy():
+ self.del_cached_iface(vsctl_iface)
+ self.del_cached_port(vsctl_port)
+
+ def add_port(self, br_name, port_name, may_exist, fake_iface,
+ iface_names, settings=None):
+ """
+ :type settings: list of (column, key, value_json)
+ where column and key are str,
+ value_json is json that is represented
+ by Datum.to_json()
+ """
+ settings = settings or []
+
+ self.populate_cache()
+ if may_exist:
+ vsctl_port = self.find_port(port_name, False)
+ if vsctl_port:
+ want_names = set(iface_names)
+ have_names = set(ovsrec_iface.name for ovsrec_iface in
+ vsctl_port.port_cfg.interfaces)
+ if vsctl_port.bridge().name != br_name:
+ vsctl_fatal('"%s" but %s is actually attached to '
+ 'vsctl_bridge %s',
+ br_name, port_name, vsctl_port.bridge().name)
+ if want_names != have_names:
+ want_names_string = ','.join(want_names)
+ have_names_string = ','.join(have_names)
+ vsctl_fatal('"%s" but %s actually has interface(s) %s' %
+ (want_names_string,
+ port_name, have_names_string))
+ return
+ self.check_conflicts(port_name,
+ 'cannot create a port named %s' % port_name)
+ for iface_name in iface_names:
+ self.check_conflicts(
+ iface_name, 'cannot create an interface named %s' % iface_name)
+
+ vsctl_bridge = self.find_bridge(br_name, True)
+ ifaces = []
+ for iface_name in iface_names:
+ ovsrec_iface = self.txn.insert(
+ self.idl.tables[vswitch_idl.OVSREC_TABLE_INTERFACE])
+ ovsrec_iface.name = iface_name
+ ifaces.append(ovsrec_iface)
+
+ ovsrec_port = self.txn.insert(
+ self.idl.tables[vswitch_idl.OVSREC_TABLE_PORT])
+ ovsrec_port.name = port_name
+ ovsrec_port.interfaces = ifaces
+ ovsrec_port.bond_fake_iface = fake_iface
+
+ if vsctl_bridge.parent:
+ tag = vsctl_bridge.vlan
+ ovsrec_port.tag = tag
+ for setting in settings:
+ # TODO:XXX self.symtab:
+ column, key, value = setting
+ self.set_column(ovsrec_port, column, key, value)
+
+ if vsctl_bridge.parent:
+ ovsrec_bridge = vsctl_bridge.parent.br_cfg
+ else:
+ ovsrec_bridge = vsctl_bridge.br_cfg
+ self.bridge_insert_port(ovsrec_bridge, ovsrec_port)
+ vsctl_port = self.add_port_to_cache(vsctl_bridge, ovsrec_port)
+ for ovsrec_iface in ifaces:
+ self.add_iface_to_cache(vsctl_port, ovsrec_iface)
+
+ @staticmethod
+ def parse_column_key_value(table_schema, setting_string):
+ """
+ parse <column>[:<key>]=<value>
+ """
+ column_value = setting_string.split('=', 1)
+ if len(column_value) == 1:
+ column = column_value[0]
+ value = None
+ else:
+ column, value = column_value
+
+ if ':' in column:
+ column, key = column.split(':', 1)
+ else:
+ key = None
+ if value is not None:
+ LOG.debug("columns %s", table_schema.columns.keys())
+ type_ = table_schema.columns[column].type
+ value = datum_from_string(type_, value)
+ LOG.debug("column %s value %s", column, value)
+
+ return (column, key, value)
+
+ def set_column(self, ovsrec_row, column, key, value_json):
+ if column not in ovsrec_row._table.columns:
+ vsctl_fatal('%s does not contain a column whose name matches "%s"'
+ % (ovsrec_row._table.name, column))
+
+ column_schema = ovsrec_row._table.columns[column]
+ if key is not None:
+ value_json = ['map', [[key, value_json]]]
+ if column_schema.type.value.type == ovs.db.types.VoidType:
+ vsctl_fatal('cannot specify key to set for non-map column %s' %
+ column)
+ datum = ovs.db.data.Datum.from_json(column_schema.type, value_json,
+ self.symtab)
+ values = getattr(ovsrec_row, column, {})
+ values.update(datum.to_python(ovs.db.idl._uuid_to_row))
+ setattr(ovsrec_row, column, values)
+ else:
+ datum = ovs.db.data.Datum.from_json(column_schema.type, value_json,
+ self.symtab)
+ setattr(ovsrec_row, column,
+ datum.to_python(ovs.db.idl._uuid_to_row))
+
+ def _get_row_by_id(self, table_name, vsctl_row_id, record_id):
+ if not vsctl_row_id.table:
+ return None
+
+ if not vsctl_row_id.name_column:
+ if record_id != '.':
+ return None
+ values = self.idl.tables[vsctl_row_id.table].rows.values()
+ if not values or len(values) > 2:
+ return None
+ referrer = values[0]
+ else:
+ referrer = None
+ for ovsrec_row in self.idl.tables[
+ vsctl_row_id.table].rows.values():
+ name = getattr(ovsrec_row, vsctl_row_id.name_column)
+ assert type(name) in (list, str, unicode)
+ if type(name) != list and name == record_id:
+ if (referrer):
+ vsctl_fatal('multiple rows in %s match "%s"' %
+ (table_name, record_id))
+ referrer = ovsrec_row
+
+ if not referrer:
+ return None
+
+ final = None
+ if vsctl_row_id.uuid_column:
+ referrer.verify(vsctl_row_id.uuid_column)
+ uuid = getattr(referrer, vsctl_row_id.uuid_column)
+
+ uuid_ = referrer._data[vsctl_row_id.uuid_column]
+ assert uuid_.type.key.type == ovs.db.types.UuidType
+ assert uuid_.type.value is None
+ assert type(uuid) == list
+
+ if len(uuid) == 1:
+ final = uuid[0]
+ else:
+ final = referrer
+
+ return final
+
+ def get_row(self, vsctl_table, record_id):
+ table_name = vsctl_table.table_name
+ if ovsuuid.is_valid_string(record_id):
+ uuid = ovsuuid.from_string(record_id)
+ return self.idl.tables[table_name].rows.get(uuid)
+ else:
+ for vsctl_row_id in vsctl_table.row_ids:
+ ovsrec_row = self._get_row_by_id(table_name, vsctl_row_id,
+ record_id)
+ if ovsrec_row:
+ return ovsrec_row
+
+ return None
+
+ def must_get_row(self, vsctl_table, record_id):
+ ovsrec_row = self.get_row(vsctl_table, record_id)
+ if not ovsrec_row:
+ vsctl_fatal('no row "%s" in table %s' % (record_id,
+ vsctl_table.table_name))
+ return ovsrec_row
+
+
+class _VSCtlRowID(object):
+ def __init__(self, table, name_column, uuid_column):
+ super(_VSCtlRowID, self).__init__()
+ self.table = table
+ self.name_column = name_column
+ self.uuid_column = uuid_column
+
+
+class _VSCtlTable(object):
+ def __init__(self, table_name, vsctl_row_id_list):
+ super(_VSCtlTable, self).__init__()
+ self.table_name = table_name
+ self.row_ids = vsctl_row_id_list
+
+
+class VSCtlCommand(object):
+ def __init__(self, command, args=None, options=None):
+ super(VSCtlCommand, self).__init__()
+ self.command = command
+ self.args = args or []
+ self.options = options or []
+
+ # Data modified by commands
+ self.result = None
+
+ # internally used by VSCtl
+ self._prerequisite = None
+ self._run = None
+
+ def has_option(self, option):
+ return option in self.options
+
+
+class VSCtl(object):
+ def _reset(self):
+ self.schema_helper = None
+ self.ovs = None
+ self.txn = None
+ self.wait_for_reload = True
+ self.dry_run = False
+
+ def __init__(self, remote):
+ super(VSCtl, self).__init__()
+ self.remote = remote
+
+ self.schema_json = None
+ self.schema = None
+ self.schema_helper = None
+ self.ovs = None
+ self.txn = None
+ self.wait_for_reload = True
+ self.dry_run = False
+
+ def _rpc_get_schema_json(self, database):
+ LOG.debug('remote %s', self.remote)
+ error, stream_ = stream.Stream.open_block(
+ stream.Stream.open(self.remote))
+ if error:
+ vsctl_fatal('error %s' % os.strerror(error))
+ rpc = jsonrpc.Connection(stream_)
+ request = jsonrpc.Message.create_request('get_schema', [database])
+ error, reply = rpc.transact_block(request)
+ rpc.close()
+
+ if error:
+ vsctl_fatal(os.strerror(error))
+ elif reply.error:
+ vsctl_fatal('error %s' % reply.error)
+ return reply.result
+
+ def _init_schema_helper(self):
+ if self.schema_json is None:
+ self.schema_json = self._rpc_get_schema_json(
+ vswitch_idl.OVSREC_DB_NAME)
+ schema_helper = idl.SchemaHelper(None, self.schema_json)
+ schema_helper.register_all()
+ self.schema = schema_helper.get_idl_schema()
+ # LOG.debug('schema_json %s', schema_json)
+ self.schema_helper = idl.SchemaHelper(None, self.schema_json)
+
+ @staticmethod
+ def _idl_block(idl_):
+ poller = ovs.poller.Poller()
+ idl_.wait(poller)
+ poller.block()
+
+ @staticmethod
+ def _idl_wait(idl_, seqno):
+ while idl_.change_seqno == seqno and not idl_.run():
+ VSCtl._idl_block(idl_)
+
+ def _run_prerequisites(self, commands):
+ schema_helper = self.schema_helper
+ schema_helper.register_table(vswitch_idl.OVSREC_TABLE_OPEN_VSWITCH)
+ if self.wait_for_reload:
+ # LOG.debug('schema_helper._tables %s', schema_helper._tables)
+ schema_helper.register_columns(
+ vswitch_idl.OVSREC_TABLE_OPEN_VSWITCH,
+ [vswitch_idl.OVSREC_OPEN_VSWITCH_COL_CUR_CFG])
+
+ for command in commands:
+ if not command._prerequisite:
+ continue
+ ctx = VSCtlContext(None, None, None)
+ command._prerequisite(ctx, command)
+ ctx.done()
+
+ def _do_vsctl(self, idl_, commands):
+ txn = idl.Transaction(idl_)
+ self.txn = txn
+ if self.dry_run:
+ txn.dry_run = True
+
+ txn.add_comment('ovs-vsctl') # TODO:XXX add operation name. args
+ ovs_rows = idl_.tables[vswitch_idl.OVSREC_TABLE_OPEN_VSWITCH].rows
+ if ovs_rows:
+ ovs_ = ovs_rows.values()[0]
+ else:
+ # XXX add verification that table is empty
+ ovs_ = txn.insert(
+ idl_.tables[vswitch_idl.OVSREC_TABLE_OPEN_VSWITCH])
+
+ if self.wait_for_reload:
+ ovs_.increment(vswitch_idl.OVSREC_OPEN_VSWITCH_COL_NEXT_CFG)
+
+ # TODO:XXX
+ # symtab = ovsdb_symbol_table_create()
+ ctx = VSCtlContext(idl_, txn, ovs_)
+ for command in commands:
+ if not command._run:
+ continue
+ command._run(ctx, command)
+ if ctx.try_again:
+ return False
+ LOG.debug('result:\n%s', [command.result for command in commands])
+ ctx.done()
+
+ # TODO:XXX check if created symbols are really created, referenced.
+
+ status = txn.commit_block()
+ next_cfg = 0
+ if self.wait_for_reload and status == idl.Transaction.SUCCESS:
+ next_cfg = txn.get_increment_new_value()
+
+ # TODO:XXX
+ # if status in (idl.Transaction.UNCHANGED, idl.Transaction.SUCCESS):
+ # for command in commands:
+ # if not command.post_func:
+ # continue
+ # ctx = VSCtlContext(idl_, txn, self.ovs)
+ # command.post_func(ctx)
+ # ctx.done()
+
+ txn_ = self.txn
+ self.txn = None
+ txn = None
+
+ if status in (idl.Transaction.UNCOMMITTED, idl.Transaction.INCOMPLETE):
+ not_reached()
+ elif status == idl.Transaction.ABORTED:
+ vsctl_fatal('transaction aborted')
+ elif status in (idl.Transaction.UNCHANGED, idl.Transaction.SUCCESS):
+ pass
+ elif status == idl.Transaction.TRY_AGAIN:
+ return False
+ elif status == idl.Transaction.ERROR:
+ vsctl_fatal('transaction error: %s' % txn_.get_error())
+ elif status == idl.Transaction.NOT_LOCKED:
+ vsctl_fatal('database not locked')
+ else:
+ not_reached()
+
+ if self.wait_for_reload and status != idl.Transaction.UNCHANGED:
+ while True:
+ idl_.run()
+ if (ovs_.cur_cfg >= next_cfg):
+ break
+ self._idl_block(idl_)
+
+ return True
+
+ def _do_main(self, commands):
+ """
+ :type commands: list of VSCtlCommand
+ """
+ self._reset()
+ self._init_schema_helper()
+ self._run_prerequisites(commands)
+
+ idl_ = idl.Idl(self.remote, self.schema_helper)
+ seqno = idl_.change_seqno
+ while True:
+ self._idl_wait(idl_, seqno)
+
+ seqno = idl_.change_seqno
+ if self._do_vsctl(idl_, commands):
+ break
+
+ if self.txn:
+ self.txn.abort()
+ self.txn = None
+ # TODO:XXX
+ # ovsdb_symbol_table_destroy(symtab)
+
+ idl_.close()
+
+ def _run_command(self, commands):
+ """
+ :type commands: list of VSCtlCommand
+ """
+ all_commands = {
+ # Port. commands
+ 'list-ports': (self._pre_get_info, self._cmd_list_ports),
+ 'add-port': (self._pre_cmd_add_port, self._cmd_add_port),
+ 'del-port': (self._pre_get_info, self._cmd_del_port),
+
+ # Controller commands.
+ 'del-controller': (self._pre_controller, self._cmd_del_controller),
+ 'set-controller': (self._pre_controller, self._cmd_set_controller),
+
+ # Database commands.
+ 'get': (self._pre_cmd_get, self._cmd_get),
+ 'find': (self._pre_cmd_find, self._cmd_find),
+ 'set': (self._pre_cmd_set, self._cmd_set),
+
+ # for quantum_adapter
+ 'list-ifaces-verbose': (self._pre_cmd_list_ifaces_verbose,
+ self._cmd_list_ifaces_verbose),
+ }
+
+ for command in commands:
+ funcs = all_commands[command.command]
+ command._prerequisite, command._run = funcs
+ self._do_main(commands)
+
+ def run_command(self, commands, timeout_msec=None, exception=None):
+ if timeout_msec is None:
+ self._run_command(commands)
+ else:
+ with gevent.Timeout(timeout_msec * 1000, exception):
+ self._run_command(commands)
+
+ def _pre_get_info(self, _ctx, _command):
+ schema_helper = self.schema_helper
+
+ schema_helper.register_columns(
+ vswitch_idl.OVSREC_TABLE_OPEN_VSWITCH,
+ [vswitch_idl.OVSREC_OPEN_VSWITCH_COL_BRIDGES])
+ schema_helper.register_columns(
+ vswitch_idl.OVSREC_TABLE_BRIDGE,
+ [vswitch_idl.OVSREC_BRIDGE_COL_NAME,
+ vswitch_idl.OVSREC_BRIDGE_COL_CONTROLLER,
+ vswitch_idl.OVSREC_BRIDGE_COL_FAIL_MODE,
+ vswitch_idl.OVSREC_BRIDGE_COL_PORTS])
+ schema_helper.register_columns(
+ vswitch_idl.OVSREC_TABLE_PORT,
+ [vswitch_idl.OVSREC_PORT_COL_NAME,
+ vswitch_idl.OVSREC_PORT_COL_FAKE_BRIDGE,
+ vswitch_idl.OVSREC_PORT_COL_TAG,
+ vswitch_idl.OVSREC_PORT_COL_INTERFACES])
+ schema_helper.register_columns(
+ vswitch_idl.OVSREC_TABLE_INTERFACE,
+ [vswitch_idl.OVSREC_INTERFACE_COL_NAME])
+
+ def _list_ports(self, ctx, br_name):
+ ctx.populate_cache()
+ br = ctx.find_bridge(br_name, True)
+ if br.br_cfg:
+ br.br_cfg.verify(vswitch_idl.OVSREC_BRIDGE_COL_PORTS)
+ else:
+ br.parent.br_cfg.verify(vswitch_idl.OVSREC_BRIDGE_COL_PORTS)
+
+ return [port.port_cfg.name for port in br.ports
+ if port.port_cfg.name != br.name]
+
+ def _cmd_list_ports(self, ctx, command):
+ br_name = command.args[0]
+ port_names = self._list_ports(ctx, br_name)
+ command.result = sorted(port_names)
+
+ def _pre_add_port(self, _ctx, columns):
+ schema_helper = self.schema_helper
+ schema_helper.register_columns(
+ vswitch_idl.OVSREC_TABLE_PORT,
+ [vswitch_idl.OVSREC_PORT_COL_NAME,
+ vswitch_idl.OVSREC_PORT_COL_BOND_FAKE_IFACE])
+ schema_helper.register_columns(
+ vswitch_idl.OVSREC_TABLE_PORT, columns)
+
+ def _pre_cmd_add_port(self, ctx, command):
+ self._pre_get_info(ctx, command)
+
+ columns = [ctx.parse_column_key_value(
+ self.schema.tables[vswitch_idl.OVSREC_TABLE_PORT], setting)[0]
+ for setting in command.args[2:]]
+ self._pre_add_port(ctx, columns)
+
+ def _cmd_add_port(self, ctx, command):
+ may_exist = command.has_option('--may_exist')
+
+ br_name = command.args[0]
+ port_name = command.args[1]
+ iface_names = [command.args[1]]
+ settings = [ctx.parse_column_key_value(
+ self.schema.tables[vswitch_idl.OVSREC_TABLE_PORT], setting)
+ for setting in command.args[2:]]
+ ctx.add_port(br_name, port_name, may_exist,
+ False, iface_names, settings)
+
+ def _del_port(self, ctx, br_name=None, target=None,
+ must_exist=False, with_iface=False):
+ assert target is not None
+
+ ctx.populate_cache()
+ if not with_iface:
+ vsctl_port = ctx.find_port(target, must_exist)
+ else:
+ vsctl_port = ctx.find_port(target, False)
+ if not vsctl_port:
+ vsctl_iface = ctx.find_iface(target, False)
+ if vsctl_iface:
+ vsctl_port = vsctl_iface.port()
+ if must_exist and not vsctl_port:
+ vsctl_fatal('no port or interface named %s' % target)
+
+ if not vsctl_port:
+ return
+ if not br_name:
+ vsctl_bridge = ctx.find_bridge(br_name, True)
+ if vsctl_port.bridge() != vsctl_bridge:
+ if vsctl_port.bridge().parent == vsctl_bridge:
+ vsctl_fatal('bridge %s does not have a port %s (although '
+ 'its parent bridge %s does)' %
+ (br_name, target, vsctl_bridge.parent.name))
+ else:
+ vsctl_fatal('bridge %s does not have a port %s' %
+ (br_name, target))
+
+ ctx.del_port(vsctl_port)
+
+ def _cmd_del_port(self, ctx, command):
+ must_exist = command.has_option('--must-exist')
+ with_iface = command.has_option('--with-iface')
+ target = command.args[-1]
+ br_name = command.args[0] if len(command.args) == 2 else None
+ self._del_port(ctx, br_name, target, must_exist, with_iface)
+
+ def _pre_cmd_list_ifaces_verbose(self, ctx, command):
+ self._pre_get_info(ctx, command)
+ schema_helper = self.schema_helper
+ schema_helper.register_columns(
+ vswitch_idl.OVSREC_TABLE_BRIDGE,
+ [vswitch_idl.OVSREC_BRIDGE_COL_DATAPATH_ID])
+ schema_helper.register_columns(
+ vswitch_idl.OVSREC_TABLE_INTERFACE,
+ [vswitch_idl.OVSREC_INTERFACE_COL_TYPE,
+ vswitch_idl.OVSREC_INTERFACE_COL_NAME,
+ vswitch_idl.OVSREC_INTERFACE_COL_EXTERNAL_IDS,
+ vswitch_idl.OVSREC_INTERFACE_COL_OPTIONS,
+ vswitch_idl.OVSREC_INTERFACE_COL_OFPORT])
+
+ @staticmethod
+ def _iface_to_dict(iface_cfg):
+ _ATTRIBUTE = ['name', 'ofport', 'type', 'external_ids', 'options']
+ attr = dict((key, getattr(iface_cfg, key)) for key in _ATTRIBUTE)
+
+ if attr['ofport']:
+ attr['ofport'] = attr['ofport'][0]
+ return attr
+
+ def _list_ifaces_verbose(self, ctx, datapath_id, port_name):
+ ctx.populate_cache()
+
+ br = ctx.find_bridge_by_id(datapath_id, True)
+ ctx.verify_ports()
+
+ iface_cfgs = []
+ if port_name is None:
+ for vsctl_port in br.ports:
+ iface_cfgs.extend(self._iface_to_dict(vsctl_iface.iface_cfg)
+ for vsctl_iface in vsctl_port.ifaces)
+ else:
+ # When port is created, ofport column might be None.
+ # So try with port name if it happended
+ port_name = port_name.rstrip('\0')
+ for vsctl_port in br.ports:
+ iface_cfgs.extend(
+ self._iface_to_dict(vsctl_iface.iface_cfg)
+ for vsctl_iface in vsctl_port.ifaces
+ if (vsctl_iface.iface_cfg.name == port_name))
+
+ return iface_cfgs
+
+ def _cmd_list_ifaces_verbose(self, ctx, command):
+ datapath_id = command.args[0]
+ port_name = None
+ if len(command.args) >= 2:
+ port_name = command.args[1]
+ LOG.debug('command.args %s', command.args)
+ iface_cfgs = self._list_ifaces_verbose(ctx, datapath_id, port_name)
+ command.result = sorted(iface_cfgs)
+
+ def _verify_controllers(self, ovsrec_bridge):
+ ovsrec_bridge.verify(vswitch_idl.OVSREC_BRIDGE_COL_CONTROLLER)
+ for controller in ovsrec_bridge.controller:
+ controller.verify(vswitch_idl.OVSREC_CONTROLLER_COL_TARGET)
+
+ def _pre_controller(self, ctx, command):
+ self._pre_get_info(ctx, command)
+ self.schema_helper.register_columns(
+ vswitch_idl.OVSREC_TABLE_CONTROLLER,
+ [vswitch_idl.OVSREC_CONTROLLER_COL_TARGET])
+
+ def _delete_controllers(self, ovsrec_controllers):
+ for controller in ovsrec_controllers:
+ controller.delete()
+
+ def _del_controller(self, ctx, br_name):
+ ctx.populate_cache()
+ br = ctx.find_real_bridge(br_name, True)
+ ovsrec_bridge = br.br_cfg
+ self._verify_controllers(ovsrec_bridge)
+ if ovsrec_bridge.controller:
+ self._delete_controllers(ovsrec_bridge.controller)
+ ovsrec_bridge.controller = []
+
+ def _cmd_del_controller(self, ctx, command):
+ br_name = command.args[0]
+ self._del_controller(ctx, br_name)
+
+ def _insert_controllers(self, controller_names):
+ ovsrec_controllers = []
+ for name in controller_names:
+ # TODO: check if the name startswith() supported protocols
+ ovsrec_controller = self.txn.insert(
+ self.txn.idl.tables[vswitch_idl.OVSREC_TABLE_CONTROLLER])
+ ovsrec_controller.target = name
+ ovsrec_controllers.append(ovsrec_controller)
+ return ovsrec_controllers
+
+ def _set_controller(self, ctx, br_name, controller_names):
+ ctx.populate_cache()
+ ovsrec_bridge = ctx.find_real_bridge(br_name, True).br_cfg
+ self._verify_controllers(ovsrec_bridge)
+ self._delete_controllers(ovsrec_bridge.controller)
+ controllers = self._insert_controllers(controller_names)
+ ovsrec_bridge.controller = controllers
+
+ def _cmd_set_controller(self, ctx, command):
+ br_name = command.args[0]
+ controller_names = command.args[1:]
+ self._set_controller(ctx, br_name, controller_names)
+
+ _TABLES = [
+ _VSCtlTable(vswitch_idl.OVSREC_TABLE_BRIDGE,
+ [_VSCtlRowID(vswitch_idl.OVSREC_TABLE_BRIDGE,
+ vswitch_idl.OVSREC_BRIDGE_COL_NAME,
+ None)]),
+ _VSCtlTable(vswitch_idl.OVSREC_TABLE_CONTROLLER,
+ [_VSCtlRowID(vswitch_idl.OVSREC_TABLE_BRIDGE,
+ vswitch_idl.OVSREC_BRIDGE_COL_NAME,
+ vswitch_idl.OVSREC_BRIDGE_COL_CONTROLLER)]),
+ _VSCtlTable(vswitch_idl.OVSREC_TABLE_INTERFACE,
+ [_VSCtlRowID(vswitch_idl.OVSREC_TABLE_INTERFACE,
+ vswitch_idl.OVSREC_INTERFACE_COL_NAME,
+ None)]),
+ _VSCtlTable(vswitch_idl.OVSREC_TABLE_MIRROR,
+ [_VSCtlRowID(vswitch_idl.OVSREC_TABLE_MIRROR,
+ vswitch_idl.OVSREC_MIRROR_COL_NAME,
+ None)]),
+ _VSCtlTable(vswitch_idl.OVSREC_TABLE_MANAGER,
+ [_VSCtlRowID(vswitch_idl.OVSREC_TABLE_MANAGER,
+ vswitch_idl.OVSREC_MANAGER_COL_TARGET,
+ None)]),
+ _VSCtlTable(vswitch_idl.OVSREC_TABLE_NETFLOW,
+ [_VSCtlRowID(vswitch_idl.OVSREC_TABLE_BRIDGE,
+ vswitch_idl.OVSREC_BRIDGE_COL_NAME,
+ vswitch_idl.OVSREC_BRIDGE_COL_NETFLOW)]),
+ _VSCtlTable(vswitch_idl.OVSREC_TABLE_OPEN_VSWITCH,
+ [_VSCtlRowID(vswitch_idl.OVSREC_TABLE_OPEN_VSWITCH,
+ None,
+ None)]),
+ _VSCtlTable(vswitch_idl.OVSREC_TABLE_PORT,
+ [_VSCtlRowID(vswitch_idl.OVSREC_TABLE_PORT,
+ vswitch_idl.OVSREC_PORT_COL_NAME,
+ None)]),
+ _VSCtlTable(vswitch_idl.OVSREC_TABLE_QOS,
+ [_VSCtlRowID(vswitch_idl.OVSREC_TABLE_PORT,
+ vswitch_idl.OVSREC_PORT_COL_NAME,
+ vswitch_idl.OVSREC_PORT_COL_QOS)]),
+ _VSCtlTable(vswitch_idl.OVSREC_TABLE_QUEUE, []),
+ _VSCtlTable(vswitch_idl.OVSREC_TABLE_SSL,
+ [_VSCtlRowID(vswitch_idl.OVSREC_TABLE_OPEN_VSWITCH,
+ None,
+ vswitch_idl.OVSREC_OPEN_VSWITCH_COL_SSL)]),
+ _VSCtlTable(vswitch_idl.OVSREC_TABLE_SFLOW,
+ [_VSCtlRowID(vswitch_idl.OVSREC_TABLE_BRIDGE,
+ vswitch_idl.OVSREC_BRIDGE_COL_NAME,
+ vswitch_idl.OVSREC_BRIDGE_COL_SFLOW)]),
+ _VSCtlTable(vswitch_idl.OVSREC_TABLE_FLOW_TABLE,
+ [_VSCtlRowID(vswitch_idl.OVSREC_TABLE_FLOW_TABLE,
+ vswitch_idl.OVSREC_FLOW_TABLE_COL_NAME,
+ None)]),
+ ]
+
+ @staticmethod
+ def _score_partial_match(name, s):
+ _MAX_SCORE = 0xffffffff
+ assert len(name) < _MAX_SCORE
+ s = s[:_MAX_SCORE - 1] # in practice, this doesn't matter
+ if name == s:
+ return _MAX_SCORE
+
+ name = name.lower().replace('-', '_')
+ s = s.lower().replace('-', '_')
+ if s.startswith(name):
+ return _MAX_SCORE - 1
+ if name.startswith(s):
+ return len(s)
+
+ return 0
+
+ @staticmethod
+ def _get_table(table_name):
+ best_match = None
+ best_score = 0
+ for table in VSCtl._TABLES:
+ score = VSCtl._score_partial_match(table.table_name, table_name)
+ if score > best_score:
+ best_match = table
+ best_score = score
+ elif score == best_score:
+ best_match = None
+
+ if best_match:
+ return best_match
+ elif best_score:
+ vsctl_fatal('multiple table names match "%s"' % table_name)
+ else:
+ vsctl_fatal('unknown table "%s"' % table_name)
+
+ def _pre_get_table(self, _ctx, table_name):
+ vsctl_table = self._get_table(table_name)
+
+ schema_helper = self.schema_helper
+ schema_helper.register_table(vsctl_table.table_name)
+ for row_id in vsctl_table.row_ids:
+ if row_id.table:
+ schema_helper.register_table(row_id.table)
+ if row_id.name_column:
+ schema_helper.register_columns(row_id.table,
+ [row_id.name_column])
+ if row_id.uuid_column:
+ schema_helper.register_columns(row_id.table,
+ [row_id.uuid_column])
+ return vsctl_table
+
+ def _get_column(self, table_name, column_name):
+ best_match = None
+ best_score = 0
+
+ columns = self.schema.tables[table_name].columns.keys()
+ for column in columns:
+ score = VSCtl._score_partial_match(column, column_name)
+ if score > best_score:
+ best_match = column
+ best_score = score
+ elif score == best_score:
+ best_match = None
+
+ if best_match:
+ # ovs.db.schema_helper._keep_table_columns() requires that
+ # column_name is type of str. Not unicode string
+ return str(best_match)
+ elif best_score:
+ vsctl_fatal('%s contains more than one column whose name '
+ 'matches "%s"' % (table_name, column_name))
+ else:
+ vsctl_fatal('%s does not contain a column whose name matches '
+ '"%s"' % (table_name, column_name))
+
+ def _pre_get_column(self, _ctx, table_name, column):
+ column_name = self._get_column(table_name, column)
+ self.schema_helper.register_columns(table_name, [column_name])
+
+ def _pre_get(self, ctx, table_name, columns):
+ vsctl_table = self._pre_get_table(ctx, table_name)
+ for column in columns:
+ self._pre_get_column(ctx, vsctl_table.table_name, column)
+
+ def _pre_cmd_get(self, ctx, command):
+ table_name = command.args[0]
+ table_schema = self.schema.tables[table_name]
+ columns = [ctx.parse_column_key_value(table_schema, column_key)[0]
+ for column_key in command.args[2:]]
+ self._pre_get(ctx, table_name, columns)
+
+ def _get(self, ctx, table_name, record_id, column_keys,
+ id_=None, if_exists=False):
+ """
+ :type column_keys: list of (column, key_string)
+ where column and key are str
+ """
+ vsctl_table = self._get_table(table_name)
+ row = ctx.must_get_row(vsctl_table, record_id)
+ if id_:
+ raise NotImplementedError() # TODO:XXX
+
+ symbol, new = ctx.create_symbol(id_)
+ if not new:
+ vsctl_fatal('row id "%s" specified on "get" command was used '
+ 'before it was defined' % id_)
+ symbol.uuid = row.uuid
+ symbol.strong_ref = True
+
+ values = []
+ for column, key_string in column_keys:
+ row.verify(column)
+ datum = getattr(row, column)
+ if key_string:
+ if type(datum) != dict:
+ vsctl_fatal('cannot specify key to get for non-map column '
+ '%s' % column)
+ values.append(datum[key_string])
+ else:
+ values.append(datum)
+
+ return values
+
+ def _cmd_get(self, ctx, command):
+ id_ = None # TODO:XXX --id
+ if_exists = command.has_option('--if-exists')
+ table_name = command.args[0]
+ record_id = command.args[1]
+ table_schema = self.schema.tables[table_name]
+ column_keys = [ctx.parse_column_key_value(table_schema, column_key)[:2]
+ for column_key in command.args[2:]]
+
+ values = self._get(ctx, table_name, record_id, column_keys,
+ id_, if_exists)
+ command.result = values
+
+ def _pre_cmd_find(self, ctx, command):
+ table_name = command.args[0]
+ table_schema = self.schema.tables[table_name]
+ columns = [ctx.parse_column_key_value(table_schema,
+ column_key_value)[0]
+ for column_key_value in command.args[1:]]
+ LOG.debug('columns %s', columns)
+ self._pre_get(ctx, table_name, columns)
+
+ def _check_value(self, ovsrec_row, column_key_value):
+ column, key, value_json = column_key_value
+ column_schema = ovsrec_row._table.columns[column]
+ value = ovs.db.data.Datum.from_json(
+ column_schema.type, value_json).to_python(ovs.db.idl._uuid_to_row)
+ datum = getattr(ovsrec_row, column)
+ if key is None:
+ if datum == value:
+ return True
+ else:
+ if datum[key] != value:
+ return True
+ return False
+
+ def _find(self, ctx, table_name, column_key_values):
+ result = []
+ for ovsrec_row in ctx.idl.tables[table_name].rows.values():
+ LOG.debug('ovsrec_row %s', ovsrec_row_to_string(ovsrec_row))
+ if all(self._check_value(ovsrec_row, column_key_value)
+ for column_key_value in column_key_values):
+ result.append(ovsrec_row)
+
+ return result
+
+ def _cmd_find(self, ctx, command):
+ table_name = command.args[0]
+ table_schema = self.schema.tables[table_name]
+ column_key_values = [ctx.parse_column_key_value(table_schema,
+ column_key_value)
+ for column_key_value in command.args[1:]]
+ command.result = self._find(ctx, table_name, column_key_values)
+
+ def _check_mutable(self, table_name, column):
+ column_schema = self.schema.tables[table_name].columns[column]
+ if not column_schema.mutable:
+ vsctl_fatal('cannot modify read-only column %s in table %s' %
+ (column, table_name))
+
+ def _pre_set(self, ctx, table_name, columns):
+ self._pre_get_table(ctx, table_name)
+ for column in columns:
+ self._pre_get_column(ctx, table_name, column)
+ self._check_mutable(table_name, column)
+
+ def _pre_cmd_set(self, ctx, command):
+ table_name = command.args[0]
+ table_schema = self.schema.tables[table_name]
+ columns = [ctx.parse_column_key_value(table_schema,
+ column_key_value)[0]
+ for column_key_value in command.args[2:]]
+ self._pre_set(ctx, table_name, columns)
+
+ def _set(self, ctx, table_name, record_id, column_key_values):
+ """
+ :type column_key_values: list of (column, key_string, value_json)
+ """
+ vsctl_table = self._get_table(table_name)
+ ovsrec_row = ctx.must_get_row(vsctl_table, record_id)
+ for column, key, value in column_key_values:
+ ctx.set_column(ovsrec_row, column, key, value)
+ ctx.invalidate_cache()
+
+ def _cmd_set(self, ctx, command):
+ table_name = command.args[0]
+ record_id = command.args[1]
+
+ # column_key_value: <column>[:<key>]=<value>
+ table_schema = self.schema.tables[table_name]
+ column_key_values = [ctx.parse_column_key_value(table_schema,
+ column_key_value)
+ for column_key_value in command.args[2:]]
+
+ self._set(ctx, table_name, record_id, column_key_values)