diff options
author | IWASE Yusuke <iwase.yusuke0@gmail.com> | 2017-02-17 13:06:03 +0900 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2017-02-22 12:23:24 +0900 |
commit | 8914385148de87f5298666ab2b1df3258afc87cc (patch) | |
tree | 49f4f63b73f3292e765dd859790951ed149463f2 | |
parent | 1b486a0634baa667d99ae2a404a35b882e5fbf84 (diff) |
zebra: Implement Server APIs for Zebra protocol service
This patch implements APIs for performing as Zebra daemon of Quagga
and enables to integrate with other Quagga daemons.
Signed-off-by: IWASE Yusuke <iwase.yusuke0@gmail.com>
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r-- | ryu/flags.py | 5 | ||||
-rw-r--r-- | ryu/services/protocols/zebra/server/__init__.py | 20 | ||||
-rw-r--r-- | ryu/services/protocols/zebra/server/event.py | 46 | ||||
-rw-r--r-- | ryu/services/protocols/zebra/server/zserver.py | 333 |
4 files changed, 404 insertions, 0 deletions
diff --git a/ryu/flags.py b/ryu/flags.py index ff53f76a..69eb3d22 100644 --- a/ryu/flags.py +++ b/ryu/flags.py @@ -79,6 +79,7 @@ DEFAULT_ZSERV_VERSION = 2 # Version of Ubuntu 16.04 LTS packaged Quagga DEFAULT_ZSERV_CLIENT_ROUTE_TYPE = 'BGP' DEFAULT_ZSERV_INTERVAL = 10 DEFAULT_ZSERV_DATABASE = 'sqlite:///zebra.db' +DEFAULT_ZSERV_ROUTER_ID = '1.1.1.1' CONF.register_cli_opts([ cfg.StrOpt( @@ -106,4 +107,8 @@ CONF.register_cli_opts([ 'db-url', default=DEFAULT_ZSERV_DATABASE, help='URL to database used by Zebra protocol service ' '(default: %s)' % DEFAULT_ZSERV_DATABASE), + cfg.StrOpt( + 'router-id', default=DEFAULT_ZSERV_ROUTER_ID, + help='Initial Router ID used by Zebra protocol service ' + '(default: %s)' % DEFAULT_ZSERV_ROUTER_ID), ], group='zapi') diff --git a/ryu/services/protocols/zebra/server/__init__.py b/ryu/services/protocols/zebra/server/__init__.py new file mode 100644 index 00000000..5b7319fb --- /dev/null +++ b/ryu/services/protocols/zebra/server/__init__.py @@ -0,0 +1,20 @@ +# Copyright (C) 2017 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Server implementation for Zebra protocol service. + +This module provides the server side implementation for Zebra protocol. +""" diff --git a/ryu/services/protocols/zebra/server/event.py b/ryu/services/protocols/zebra/server/event.py new file mode 100644 index 00000000..63ea6301 --- /dev/null +++ b/ryu/services/protocols/zebra/server/event.py @@ -0,0 +1,46 @@ +# Copyright (C) 2017 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Events generated by Zebra Server service. +""" + +from ryu.controller.event import EventBase + + +class EventZServerBase(EventBase): + """ + The base class for the event generated by ZServer. + """ + + +class EventZClientConnected(EventZServerBase): + """ + The event class for notifying the connection from Zebra client. + """ + + def __init__(self, zclient): + super(EventZClientConnected, self).__init__() + self.zclient = zclient + + +class EventZClientDisconnected(EventZServerBase): + """ + The event class for notifying the disconnection to Zebra client. + """ + + def __init__(self, zclient): + super(EventZClientDisconnected, self).__init__() + self.zclient = zclient diff --git a/ryu/services/protocols/zebra/server/zserver.py b/ryu/services/protocols/zebra/server/zserver.py new file mode 100644 index 00000000..f8119900 --- /dev/null +++ b/ryu/services/protocols/zebra/server/zserver.py @@ -0,0 +1,333 @@ +# Copyright (C) 2017 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Zebra Server corresponding to 'zserv' structure. +""" + +import contextlib +import logging +import os +import socket +import struct + +import netaddr + +from ryu import cfg +from ryu.base import app_manager +from ryu.base.app_manager import RyuApp +from ryu.controller.handler import set_ev_cls +from ryu.lib import hub +from ryu.lib.packet import zebra + +from ryu.services.protocols.zebra import db +from ryu.services.protocols.zebra import event +from ryu.services.protocols.zebra.server import event as zserver_event + + +LOG = logging.getLogger(__name__) + +CONF = cfg.CONF['zapi'] +GLOBAL_CONF = cfg.CONF + +# Session to database of Zebra protocol service +SESSION = db.Session() + + +class ZClient(object): + """ + Zebra client class. + """ + + def __init__(self, server, sock, addr): + self.server = server + self.sock = sock + self.addr = addr + self.logger = server.logger + self.is_active = False + self._threads = [] + self.send_q = hub.Queue(16) + + # Zebra protocol version + self.zserv_ver = CONF.server_version + + # Zebra route type distributed by client (not initialized yet) + self.route_type = None + + def start(self): + self.is_active = True + self.sock.settimeout(GLOBAL_CONF.socket_timeout) + + self._threads.append(hub.spawn(self._send_loop)) + self._threads.append(hub.spawn(self._recv_loop)) + + self.server.send_event_to_observers( + zserver_event.EventZClientConnected(self)) + + hub.joinall(self._threads) + + self.server.send_event_to_observers( + zserver_event.EventZClientDisconnected(self)) + + def stop(self): + self.is_active = False + + def _send_loop(self): + try: + while self.is_active: + buf = self.send_q.get() + self.sock.sendall(buf) + except socket.error as e: + self.logger.exception( + 'Error while sending message to Zebra client%s: %s', + self.addr, e) + + self.stop() + + def _recv_loop(self): + buf = b'' + min_len = recv_len = zebra.ZebraMessage.get_header_size( + self.zserv_ver) + try: + while self.is_active: + try: + recv_buf = self.sock.recv(recv_len) + except socket.timeout: + continue + + if len(recv_buf) == 0: + break + + buf += recv_buf + while len(buf) >= min_len: + (length,) = struct.unpack_from('!H', buf) + if (length - len(buf)) > 0: + # Need to receive remaining data + recv_len = length - len(buf) + break + + msg, _, buf = zebra.ZebraMessage.parser(buf) + + ev = event.message_to_event(self, msg) + if ev: + self.logger.debug('Notify event: %s', ev) + self.server.send_event_to_observers(ev) + + except socket.error as e: + self.logger.exception( + 'Error while sending message to Zebra client%s: %s', + self.addr, e) + + self.stop() + + def send_msg(self, msg): + """ + Sends Zebra message. + + :param msg: Instance of py:class: `ryu.lib.packet.zebra.ZebraMessage`. + :return: Serialized msg if succeeded, otherwise None. + """ + if not self.is_active: + self.logger.debug( + 'Cannot send message: Already deactivated: msg=%s', msg) + return + elif not self.send_q: + self.logger.debug( + 'Cannot send message: Send queue does not exist: msg=%s', msg) + return + elif self.zserv_ver != msg.version: + self.logger.debug( + 'Zebra protocol version mismatch:' + 'server_version=%d, msg.version=%d', + self.zserv_ver, msg.version) + msg.version = self.zserv_ver # fixup + + self.send_q.put(msg.serialize()) + + +def zclient_connection_factory(sock, addr): + LOG.debug('Connected from client: %s: %s', addr, sock) + zserv = app_manager.lookup_service_brick(ZServer.__name__) + with contextlib.closing(ZClient(zserv, sock, addr)) as zclient: + try: + zclient.start() + except Exception as e: + LOG.error('Error in client%s: %s', addr, e) + raise e + + +def detect_address_family(host): + if netaddr.valid_ipv4(host): + return socket.AF_INET + elif netaddr.valid_ipv6(host): + return socket.AF_INET6 + elif os.path.isdir(os.path.dirname(host)): + return socket.AF_UNIX + else: + return None + + +class ZServer(RyuApp): + """ + The base class for Zebra server application. + """ + _EVENTS = event.ZEBRA_EVENTS + [ + zserver_event.EventZClientConnected, + zserver_event.EventZClientDisconnected, + ] + + def __init__(self, *args, **kwargs): + super(ZServer, self).__init__(*args, **kwargs) + self.zserv = None + self.zserv_addr = (CONF.server_host, CONF.server_port) + self.zapi_connection_family = detect_address_family(CONF.server_host) + + # Initial Router ID for Zebra server + self.router_id = CONF.router_id + + def start(self): + super(ZServer, self).start() + + if self.zapi_connection_family == socket.AF_UNIX: + unix_sock_dir = os.path.dirname(CONF.server_host) + # Makes sure the unix socket does not already exist + if os.path.exists(CONF.server_host): + os.remove(CONF.server_host) + if not os.path.isdir(unix_sock_dir): + os.mkdir(unix_sock_dir) + os.chmod(unix_sock_dir, 0o777) + + try: + self.zserv = hub.StreamServer( + self.zserv_addr, zclient_connection_factory) + except OSError as e: + self.logger.error( + 'Cannot start Zebra server%s: %s', self.zserv_addr, e) + raise e + + if self.zapi_connection_family == socket.AF_UNIX: + os.chmod(CONF.server_host, 0o777) + + self._add_lo_interface() + + return hub.spawn(self.zserv.serve_forever) + + def _add_lo_interface(self): + intf = db.interface.ip_link_add(SESSION, 'lo') + if intf: + self.logger.debug('Added interface "%s": %s', intf.ifname, intf) + + route = db.route.ip_route_add( + SESSION, + destination='127.0.0.0/8', + device='lo', + source='127.0.0.1/8', + route_type=zebra.ZEBRA_ROUTE_CONNECT) + if route: + self.logger.debug( + 'Added route to "%s": %s', route.destination, route) + + @set_ev_cls(event.EventZebraHello) + def _hello_handler(self, ev): + if ev.body is None: + self.logger.debug('Client %s says hello.', ev.zclient) + return + + # Set distributed route_type to ZClient + ev.zclient.route_type = ev.body.route_type + self.logger.debug( + 'Client %s says hello and bids fair to announce only %s routes', + ev.zclient, ev.body.route_type) + + @set_ev_cls(event.EventZebraRouterIDAdd) + def _router_id_add_handler(self, ev): + self.logger.debug( + 'Client %s requests router_id, server will response: router_id=%s', + ev.zclient, self.router_id) + + # Send ZEBRA_ROUTER_ID_UPDATE for response + msg = zebra.ZebraMessage( + body=zebra.ZebraRouterIDUpdate( + family=socket.AF_INET, + prefix='%s/32' % self.router_id)) + ev.zclient.send_msg(msg) + + @set_ev_cls(event.EventZebraInterfaceAdd) + def _interface_add_handler(self, ev): + self.logger.debug('Client %s requested all interfaces', ev.zclient) + + interfaces = db.interface.ip_address_show_all(SESSION) + self.logger.debug('Server will response interfaces: %s', interfaces) + for intf in interfaces: + msg = zebra.ZebraMessage( + body=zebra.ZebraInterfaceAdd( + ifname=intf.ifname, + ifindex=intf.ifindex, + status=intf.status, + if_flags=intf.flags, + metric=intf.metric, + ifmtu=intf.ifmtu, + ifmtu6=intf.ifmtu6, + bandwidth=intf.bandwidth, + ll_type=intf.ll_type, + hw_addr=intf.hw_addr)) + ev.zclient.send_msg(msg) + + routes = db.route.ip_route_show_all( + SESSION, ifindex=intf.ifindex, is_selected=True) + self.logger.debug('Server will response routes: %s', routes) + for route in routes: + dest, _ = route.destination.split('/') + msg = zebra.ZebraMessage( + body=zebra.ZebraInterfaceAddressAdd( + ifindex=intf.ifindex, + ifc_flags=0, + family=None, + prefix=route.source, + dest=dest)) + ev.zclient.send_msg(msg) + + @set_ev_cls([event.EventZebraIPv4RouteAdd, + event.EventZebraIPv6RouteAdd]) + def _ip_route_add_handler(self, ev): + self.logger.debug( + 'Client %s advertised IP route: %s', ev.zclient, ev.body) + + for nexthop in ev.body.nexthops: + route = db.route.ip_route_add( + SESSION, + destination=ev.body.prefix, + gateway=nexthop.addr, + ifindex=nexthop.ifindex or 0, + route_type=ev.body.route_type) + if route: + self.logger.debug( + 'Added route to "%s": %s', route.destination, route) + + @set_ev_cls([event.EventZebraIPv4RouteDelete, + event.EventZebraIPv6RouteDelete]) + def _ip_route_delete_handler(self, ev): + self.logger.debug( + 'Client %s withdrew IP route: %s', ev.zclient, ev.body) + + for nexthop in ev.body.nexthops: + routes = db.route.ip_route_delete( + SESSION, + destination=ev.body.prefix, + gateway=nexthop.addr, + route_type=ev.body.route_type) + if routes: + self.logger.debug( + 'Deleted routes to "%s": %s', ev.body.prefix, routes) |