summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorIWASE Yusuke <iwase.yusuke0@gmail.com>2017-10-06 10:34:59 +0900
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2017-10-15 13:04:38 +0900
commitf7752903604fbc334752867920947909d79c7039 (patch)
treec533e017344d9017e8260da3082ad184d66fe448
parent6e69e9b8a6ebd2bb06aa864cc76ef0b79d1ac9c3 (diff)
controller: Support proactive connection
This patch enables to initiate OpenFlow connection from controller side by using "--ofp-switch-address-list" and "--ofp-switch-connect-interval" options. Signed-off-by: IWASE Yusuke <iwase.yusuke0@gmail.com> Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r--ryu/controller/controller.py55
-rw-r--r--ryu/lib/hub.py33
2 files changed, 85 insertions, 3 deletions
diff --git a/ryu/controller/controller.py b/ryu/controller/controller.py
index 62bca5fb..84e0e05c 100644
--- a/ryu/controller/controller.py
+++ b/ryu/controller/controller.py
@@ -27,11 +27,11 @@ from ryu import cfg
import logging
from ryu.lib import hub
from ryu.lib.hub import StreamServer
-import traceback
import random
import ssl
from socket import IPPROTO_TCP, TCP_NODELAY, SHUT_RDWR, timeout as SocketTimeout
-import warnings
+
+import netaddr
import ryu.base.app_manager
@@ -49,6 +49,7 @@ from ryu.lib.dpid import dpid_to_str
LOG = logging.getLogger('ryu.controller.controller')
DEFAULT_OFP_HOST = '0.0.0.0'
+DEFAULT_OFP_SW_CON_INTERVAL = 1
CONF = cfg.CONF
CONF.register_cli_opts([
@@ -62,7 +63,14 @@ CONF.register_cli_opts([
'(default: %d)' % ofproto_common.OFP_SSL_PORT),
cfg.StrOpt('ctl-privkey', default=None, help='controller private key'),
cfg.StrOpt('ctl-cert', default=None, help='controller certificate'),
- cfg.StrOpt('ca-certs', default=None, help='CA certificates')
+ cfg.StrOpt('ca-certs', default=None, help='CA certificates'),
+ cfg.ListOpt('ofp-switch-address-list', item_type=str, default=[],
+ help='list of IP address and port pairs (default empty). '
+ 'e.g., "127.0.0.1:6653,127.0.0.1:6633"'),
+ cfg.IntOpt('ofp-switch-connect-interval',
+ default=DEFAULT_OFP_SW_CON_INTERVAL,
+ help='interval in seconds to connect to switches '
+ '(default %d)' % DEFAULT_OFP_SW_CON_INTERVAL),
])
CONF.register_opts([
cfg.FloatOpt('socket-timeout',
@@ -78,6 +86,38 @@ CONF.register_opts([
])
+def _split_addr(addr):
+ """
+ Splits a str of IP address and port pair into (host, port).
+
+ Example::
+
+ >>> _split_addr('127.0.0.1:6653')
+ ('127.0.0.1', 6653)
+ >>> _split_addr('[::1]:6653')
+ ('::1', 6653)
+
+ Raises ValueError if invalid format.
+
+ :param addr: A pair of IP address and port.
+ :return: IP address and port
+ """
+ e = ValueError('Invalid IP address and port pair: "%s"' % addr)
+ pair = addr.rsplit(':', 1)
+ if len(pair) != 2:
+ raise e
+
+ addr, port = pair
+ if addr.startswith('[') and addr.endswith(']'):
+ addr = addr.lstrip('[').rstrip(']')
+ if not netaddr.valid_ipv6(addr):
+ raise e
+ elif not netaddr.valid_ipv4(addr):
+ raise e
+
+ return addr, int(port, 0)
+
+
class OpenFlowController(object):
def __init__(self):
super(OpenFlowController, self).__init__()
@@ -96,9 +136,18 @@ class OpenFlowController(object):
# entry point
def __call__(self):
# LOG.debug('call')
+ for address in CONF.ofp_switch_address_list:
+ addr = tuple(_split_addr(address))
+ self.spawn_client_loop(addr)
+
self.server_loop(self.ofp_tcp_listen_port,
self.ofp_ssl_listen_port)
+ def spawn_client_loop(self, addr, interval=None):
+ interval = interval or CONF.ofp_switch_connect_interval
+ client = hub.StreamClient(addr)
+ hub.spawn(client.connect_loop, datapath_connection_factory, interval)
+
def server_loop(self, ofp_tcp_listen_port, ofp_ssl_listen_port):
if CONF.ctl_privkey is not None and CONF.ctl_cert is not None:
if CONF.ca_certs is not None:
diff --git a/ryu/lib/hub.py b/ryu/lib/hub.py
index a4f6118e..fbefadae 100644
--- a/ryu/lib/hub.py
+++ b/ryu/lib/hub.py
@@ -140,6 +140,39 @@ if HUB_TYPE == 'eventlet':
sock, addr = self.server.accept()
spawn(self.handle, sock, addr)
+ class StreamClient(object):
+ def __init__(self, addr, timeout=None, **ssl_args):
+ assert netaddr.valid_ipv4(addr[0]) or netaddr.valid_ipv6(addr[0])
+ self.addr = addr
+ self.timeout = timeout
+ self.ssl_args = ssl_args
+ self._is_active = True
+
+ def connect(self):
+ try:
+ if self.timeout is not None:
+ client = socket.create_connection(self.addr,
+ timeout=self.timeout)
+ else:
+ client = socket.create_connection(self.addr)
+ except socket.error:
+ return None
+
+ if self.ssl_args:
+ client = ssl.wrap_socket(client, **self.ssl_args)
+
+ return client
+
+ def connect_loop(self, handle, interval):
+ while self._is_active:
+ sock = self.connect()
+ if sock:
+ handle(sock, self.addr)
+ sleep(interval)
+
+ def stop(self):
+ self._is_active = False
+
class LoggingWrapper(object):
def write(self, message):
LOG.info(message.rstrip('\n'))