diff options
author | IWASE Yusuke <iwase.yusuke0@gmail.com> | 2017-10-06 10:34:59 +0900 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2017-10-15 13:04:38 +0900 |
commit | f7752903604fbc334752867920947909d79c7039 (patch) | |
tree | c533e017344d9017e8260da3082ad184d66fe448 | |
parent | 6e69e9b8a6ebd2bb06aa864cc76ef0b79d1ac9c3 (diff) |
controller: Support proactive connection
This patch enables to initiate OpenFlow connection from controller side
by using "--ofp-switch-address-list" and "--ofp-switch-connect-interval"
options.
Signed-off-by: IWASE Yusuke <iwase.yusuke0@gmail.com>
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r-- | ryu/controller/controller.py | 55 | ||||
-rw-r--r-- | ryu/lib/hub.py | 33 |
2 files changed, 85 insertions, 3 deletions
diff --git a/ryu/controller/controller.py b/ryu/controller/controller.py index 62bca5fb..84e0e05c 100644 --- a/ryu/controller/controller.py +++ b/ryu/controller/controller.py @@ -27,11 +27,11 @@ from ryu import cfg import logging from ryu.lib import hub from ryu.lib.hub import StreamServer -import traceback import random import ssl from socket import IPPROTO_TCP, TCP_NODELAY, SHUT_RDWR, timeout as SocketTimeout -import warnings + +import netaddr import ryu.base.app_manager @@ -49,6 +49,7 @@ from ryu.lib.dpid import dpid_to_str LOG = logging.getLogger('ryu.controller.controller') DEFAULT_OFP_HOST = '0.0.0.0' +DEFAULT_OFP_SW_CON_INTERVAL = 1 CONF = cfg.CONF CONF.register_cli_opts([ @@ -62,7 +63,14 @@ CONF.register_cli_opts([ '(default: %d)' % ofproto_common.OFP_SSL_PORT), cfg.StrOpt('ctl-privkey', default=None, help='controller private key'), cfg.StrOpt('ctl-cert', default=None, help='controller certificate'), - cfg.StrOpt('ca-certs', default=None, help='CA certificates') + cfg.StrOpt('ca-certs', default=None, help='CA certificates'), + cfg.ListOpt('ofp-switch-address-list', item_type=str, default=[], + help='list of IP address and port pairs (default empty). ' + 'e.g., "127.0.0.1:6653,127.0.0.1:6633"'), + cfg.IntOpt('ofp-switch-connect-interval', + default=DEFAULT_OFP_SW_CON_INTERVAL, + help='interval in seconds to connect to switches ' + '(default %d)' % DEFAULT_OFP_SW_CON_INTERVAL), ]) CONF.register_opts([ cfg.FloatOpt('socket-timeout', @@ -78,6 +86,38 @@ CONF.register_opts([ ]) +def _split_addr(addr): + """ + Splits a str of IP address and port pair into (host, port). + + Example:: + + >>> _split_addr('127.0.0.1:6653') + ('127.0.0.1', 6653) + >>> _split_addr('[::1]:6653') + ('::1', 6653) + + Raises ValueError if invalid format. + + :param addr: A pair of IP address and port. + :return: IP address and port + """ + e = ValueError('Invalid IP address and port pair: "%s"' % addr) + pair = addr.rsplit(':', 1) + if len(pair) != 2: + raise e + + addr, port = pair + if addr.startswith('[') and addr.endswith(']'): + addr = addr.lstrip('[').rstrip(']') + if not netaddr.valid_ipv6(addr): + raise e + elif not netaddr.valid_ipv4(addr): + raise e + + return addr, int(port, 0) + + class OpenFlowController(object): def __init__(self): super(OpenFlowController, self).__init__() @@ -96,9 +136,18 @@ class OpenFlowController(object): # entry point def __call__(self): # LOG.debug('call') + for address in CONF.ofp_switch_address_list: + addr = tuple(_split_addr(address)) + self.spawn_client_loop(addr) + self.server_loop(self.ofp_tcp_listen_port, self.ofp_ssl_listen_port) + def spawn_client_loop(self, addr, interval=None): + interval = interval or CONF.ofp_switch_connect_interval + client = hub.StreamClient(addr) + hub.spawn(client.connect_loop, datapath_connection_factory, interval) + def server_loop(self, ofp_tcp_listen_port, ofp_ssl_listen_port): if CONF.ctl_privkey is not None and CONF.ctl_cert is not None: if CONF.ca_certs is not None: diff --git a/ryu/lib/hub.py b/ryu/lib/hub.py index a4f6118e..fbefadae 100644 --- a/ryu/lib/hub.py +++ b/ryu/lib/hub.py @@ -140,6 +140,39 @@ if HUB_TYPE == 'eventlet': sock, addr = self.server.accept() spawn(self.handle, sock, addr) + class StreamClient(object): + def __init__(self, addr, timeout=None, **ssl_args): + assert netaddr.valid_ipv4(addr[0]) or netaddr.valid_ipv6(addr[0]) + self.addr = addr + self.timeout = timeout + self.ssl_args = ssl_args + self._is_active = True + + def connect(self): + try: + if self.timeout is not None: + client = socket.create_connection(self.addr, + timeout=self.timeout) + else: + client = socket.create_connection(self.addr) + except socket.error: + return None + + if self.ssl_args: + client = ssl.wrap_socket(client, **self.ssl_args) + + return client + + def connect_loop(self, handle, interval): + while self._is_active: + sock = self.connect() + if sock: + handle(sock, self.addr) + sleep(interval) + + def stop(self): + self._is_active = False + class LoggingWrapper(object): def write(self, message): LOG.info(message.rstrip('\n')) |