前言
RYU控制器是日本NTT公司负责研发的一款开源的SDN/OpenFlow控制器,这个控制器是取名于日本的‘flow’的意思,所以叫RYU,RYU控制器完全有python语言编写,和POX类似。RYU控制器现在支持到OpenFlow版本的1.0,1.2,1.3,1.4版本, 同时支持与OpenStack结合使用,应用于云计算领域。
正文
本篇文章先从最简单的application example_switch_l3.py进行剖析,之后进一步分析整个RYU框架。 在程序中经常出现如下的修饰器:
@set_ev_cls(ofp_event.EventOFPSwitchFeatures, CONFIG_DISPATCHER)
def switch_features_handler(self, ev):
datapath = ev.msg.datapath
ofproto = datapath.ofproto
parser = datapath.ofproto_parser
# install the table-miss flow entry.
match = parser.OFPMatch()
actions = [parser.OFPActionOutput(ofproto.OFPP_CONTROLLER,
ofproto.OFPCML_NO_BUFFER)]
self.add_flow(datapath, 0, match, actions)
def set_ev_cls(ev_cls, dispatchers=None):
def _set_ev_cls_dec(handler):
if 'callers' not in dir(handler):
handler.callers = {}
for e in _listify(ev_cls):
handler.callers[e] = _Caller(_listify(dispatchers), e.__module__)
return handler
return _set_ev_cls_dec
set_ev_cls函数接收两个参数,第一个代表需要监听的事件,第二个参数表示该事件在交换机与控制器交互的哪个阶段发生有以下四个取值,可或。
HANDSHAKE_DISPATCHER = "handshake"
CONFIG_DISPATCHER = "config"
MAIN_DISPATCHER = "main"
DEAD_DISPATCHER = "dead"
若监听到EventOFPSwitchFeatures事件,就会触发下面的handler函数。
上述过程实际上是:
switch_features_handler=set_ev_cls(ofp_event.EventOFPSwitchFeatures, CONFIG_DISPATCHER)(switch_features_handler)
整个过程可以描述为: 首先switch_features_handler作为参数传递给 _set_ev_cls_dec函数,先判断该函数有没有callers属性(因为同一函数也可能被其他事件触发),若没有则添加该属性,要注意函数也是具有属性的,并且可以在定义外面添加。 callers是一个字典,键值key是传入的事件类对应的列表形式,这里就是[ofp_event.EventOFPSwitchFeatures],value是一个_Caller对象,该对象主要就是记下dispatchers,和这个事件的模块名。 _Caller函数原型为:
class _Caller(object):
"""Describe how to handle an event class.
"""
def __init__(self, dispatchers, ev_source):
"""Initialize _Caller.
:param dispatchers: A list of states or a state, in which this
is in effect.
None and [] mean all states.
:param ev_source: The module which generates the event.
ev_cls.__module__ for set_ev_cls.
None for set_ev_handler.
"""
self.dispatchers = dispatchers
self.ev_source = ev_source
这是第一个部分,总结下来完成的工作就是给switch_features_handler函数添加了一个callers字典属性,里面保存了自己感兴趣的事件和dispatcher。
######事件是如何通知该函数的,ev这个类又是如何产生?
上面的问题也是最关心的问题,要理解这个还需要参考以下:
RYU main函数
RYU源码解读
RYU在运行 ryu-manage <application>
时,实际上先运行以下main函数。
def main(args=None, prog=None):
try:
CONF(args=args, prog=prog,
project='ryu', version='ryu-manager %s' % version,
default_config_files=['/usr/local/etc/ryu/ryu.conf'])
except cfg.ConfigFilesNotFoundError:
CONF(args=args, prog=prog,
project='ryu', version='ryu-manager %s' % version)
log.init_log()
logger = logging.getLogger(__name__)
if CONF.enable_debugger:
msg = 'debugging is available (--enable-debugger option is turned on)'
logger.info(msg)
else:
hub.patch(thread=True)
if CONF.pid_file:
import os
with open(CONF.pid_file, 'w') as pid_file:
pid_file.write(str(os.getpid()))
app_lists = CONF.app_lists + CONF.app
# keep old behavior, run ofp if no application is specified.
if not app_lists:
app_lists = ['ryu.controller.ofp_handler']
app_mgr = AppManager.get_instance()
app_mgr.load_apps(app_lists)
contexts = app_mgr.create_contexts()
services = []
services.extend(app_mgr.instantiate_apps(**contexts))
webapp = wsgi.start_service(app_mgr)
if webapp:
thr = hub.spawn(webapp)
services.append(thr)
try:
hub.joinall(services)
except KeyboardInterrupt:
logger.debug("Keyboard Interrupt received. "
"Closing RYU application manager...")
finally:
app_mgr.close()
首先就涉及到OSLO模块的使用。留以后分析
app_lists从输入的参数中获得,若没有则指定为ryu.controller.ofp_handler,然后运行
app_mgr = AppManager.get_instance()
调用APPManager类的静态方法,改静态方法为:
@staticmethod
def get_instance():
if not AppManager._instance:
AppManager._instance = AppManager()
return AppManager._instance
很明显,获得一个实例。
接下来,加载应用。
app_mgr.load_apps(app_lists)
def load_app(self, name):
mod = utils.import_module(name) #加载这个模块,mod代表的是什么结构?
clses = inspect.getmembers(mod,
lambda cls: (inspect.isclass(cls) and
issubclass(cls, RyuApp) and
mod.__name__ ==
cls.__module__))
if clses:
return clses[0][1]
return None
load_app函数是下面函数的调用函数,app_lists是命令行传入要运行的APP,可以有多个,通过依次 调用load_app进行加载。加载app为以下几步,首先调用utils.import_module(name)
,该函数返回整个模块的环境,包括类和方法,然后通过 inspect.getmembers过滤出我们写的class,也就是ExampleSwitch13,通过三个条件删选:类,RyuApp的子类,所在的文件为主文件(这个排除了其他地方import进来的类)。注意返回的是名字和对应的属性,也就是类名和对应的类,clses[0][1]表示返回第一个类,不是类名,类名是[0][0]。在官方文档中也有提到只运行APP实现的第一个类。
def load_apps(self, app_lists):
app_lists = [app for app
in itertools.chain.from_iterable(app.split(',')
for app in app_lists)]
while len(app_lists) > 0:
app_cls_name = app_lists.pop(0)
context_modules = [x.__module__ for x in self.contexts_cls.values()]
if app_cls_name in context_modules:
continue
LOG.info('loading app %s', app_cls_name)
cls = self.load_app(app_cls_name)
if cls is None:
continue
self.applications_cls[app_cls_name] = cls
services = []
for key, context_cls in cls.context_iteritems():
v = self.contexts_cls.setdefault(key, context_cls)
assert v == context_cls
context_modules.append(context_cls.__module__)
if issubclass(context_cls, RyuApp):
services.extend(get_dependent_services(context_cls))
# we can't load an app that will be initiataed for
# contexts.
for i in get_dependent_services(cls):
if i not in context_modules:
services.append(i)
if services:
app_lists.extend([s for s in set(services)
if s not in app_lists])
可能app_lists有多个app,先将他们统一为一个列表,并且依次加载执行。
以上函数完成依次加载app,其实就是找到该APP对应要运行的类,并把该类的名字存到applications_cls中,key是app的名字,如module.path.module_name
,值就是返回要运行的类。
加载的过程中顺便加载每个app依赖的app类,而且把每个app的context保存到context_cls中。依赖的APP保存在_CONTEXT中,这也是ryu实现的一种模块间通信机制。具体参考RYU模块间通信
到这一步,除了applications_cls,还完成了context_cls字典的构造。该字典内容和 _CONTEXTS中内容一致。是当前app所依赖的服务,如其他APP。并将该服务添加在services列表中。
def get_dependent_services(cls):
services = []
for _k, m in inspect.getmembers(cls, _is_method):
if _has_caller(m):
for ev_cls, c in m.callers.items():
service = getattr(sys.modules[ev_cls.__module__],
'_SERVICE_NAME', None)
if service:
# avoid cls that registers the own events (like
# ofp_handler)
if cls.__module__ != service:
services.append(service)
m = sys.modules[cls.__module__]
services.extend(getattr(m, '_REQUIRED_APP', []))
services = list(set(services))
return services
接下来对依赖的服务和本身服务调用get_dependent_services,原型如上。get_dependent_services函数获取其依赖应用,最后将所有的依赖services添加到app_lists中。该函数完成两个操作,一是判断该依赖项的method中有没有订阅事件,如果有就将ev_cls.__module__._SERVICE_NAME(也就是ofp_event._SERVICE_NAME)添加到services中.二是有没有_REQUIRED_APP,实际中都没怎么用到。 最后将依赖项添加到app_lists中。
create_contexts函数:实例化context_cls中的context,如果这个context是app,就实例化该app。记得在load_apps中没有管属于context_cls的app吗,在这里直接初始化了。
def create_contexts(self):
for key, cls in self.contexts_cls.items():
if issubclass(cls, RyuApp):
# hack for dpset
context = self._instantiate(None, cls) //初始化app
else:
context = cls() //实例化context
LOG.info('creating context %s', key)
assert key not in self.contexts
self.contexts[key] = context //加入contexts字典
return self.contexts //返回contexts字典
完成上述后,创建了contexts 字典,key是依赖项名字,value是初始化后的依赖项。 返回到main函数中,接下来运行
services = []
services.extend(app_mgr.instantiate_apps(**contexts))
即调用
def instantiate_apps(self, *args, **kwargs):
for app_name, cls in self.applications_cls.items():
self._instantiate(app_name, cls, *args, **kwargs)
self._update_bricks()
self.report_bricks()
threads = []
for app in self.applications.values():
t = app.start()
if t is not None:
app.set_main_thread(t)
threads.append(t)
return threads
def _instantiate(self, app_name, cls, *args, **kwargs):
# for now, only single instance of a given module
# Do we need to support multiple instances?
# Yes, maybe for slicing.
LOG.info('instantiating app %s of %s', app_name, cls.__name__)
if hasattr(cls, 'OFP_VERSIONS') and cls.OFP_VERSIONS is not None:
ofproto_protocol.set_app_supported_versions(cls.OFP_VERSIONS)
if app_name is not None:
assert app_name not in self.applications
app = cls(*args, **kwargs)
register_app(app)
assert app.name not in self.applications
self.applications[app.name] = app
return app
_instantiate函数中就开始明朗了,先设置OFP_VERSIONS,做必要检查后运行 register_app(app)
。该函数完成两件事,一是applications[app.name] = app该字典的构造,比较applications_cls,key是app的名字,如module.path.module_name,值就是返回要运行的类。而这里,key是类名,而value是实例化的该类对象。二是注册该APP,如下
def register_app(app):
assert isinstance(app, RyuApp)
assert app.name not in SERVICE_BRICKS
SERVICE_BRICKS[app.name] = app
register_instance(app)
完成了重要的数据结构SERVICE_BRICKS的构造,其中key是要运行的类的名字,value是实例好的该类对象。
app.name表示这个类的名称,而app则代表这个类本身。name这个属性在基类RyuApp中有定义。
将该APP加入到SERVICE_BRICKS字典中,该字典代表一个服务链。值得注意的是,只要继承了基类RyuApp的app,则app.name都是该类的类名,因为定义为self.name = self.__class__.__name__
,但是ofp_handler中class OFPHandler对name属性进行了重写,self.name = ‘ofp_event’,在后面会有作用。
def register_instance(i):
for _k, m in inspect.getmembers(i, inspect.ismethod):
# LOG.debug('instance %s k %s m %s', i, _k, m)
if _has_caller(m):
for ev_cls, c in m.callers.items():
i.register_handler(ev_cls, m)
def _has_caller(meth):
return hasattr(meth, 'callers')
def register_handler(self, ev_cls, handler):
assert callable(handler)
self.event_handlers.setdefault(ev_cls, [])
self.event_handlers[ev_cls].append(handler)
和之前的开始连接起来,这里对APP的类中每一个method检查是否有callers属性,如果有,就注册句柄。最终将事件和对应要触发的函数以键值对形式保存在event_handlers字典中。
至此,生成了两个字典,一是 SERVICE_BRICKS[app.name] = app,代表APP运行的类,一是event_handlers,保存对应事件和触发函数。
再次回到instantiate_apps函数,还没有运行完呢。接下来是
self._update_bricks()
self.report_bricks()
def _update_bricks(self):
for i in SERVICE_BRICKS.values():
for _k, m in inspect.getmembers(i, inspect.ismethod):
if not hasattr(m, 'callers'):
continue
for ev_cls, c in m.callers.items():
if not c.ev_source:
continue
brick = _lookup_service_brick_by_mod_name(c.ev_source)
if brick:
brick.register_observer(ev_cls, i.name,
c.dispatchers)
# allow RyuApp and Event class are in different module
for brick in SERVICE_BRICKS.values():
if ev_cls in brick._EVENTS:
brick.register_observer(ev_cls, i.name,
c.dispatchers)
这个函数很关键,第一步对APP中的类进行检查是否有callers,如果有说明是修饰过,再进一步看c.ev_source,回顾
handler.callers[e] = _Caller(_listify(dispatchers), e.__module__)
其中 e表示事件类,c.ev_source表示该事件类所在模块名。其实c.ev_source如果是的module那么就是ofp_event,而OFPHandler类的名字正好就是’ofp_event’.
这里有一个技巧,其实c.ev_source如果是的module那么就是ofp_event,而OFPHandler类的名字正好就是’ofp_event’(在下文中会看到其实就是OpenFlowController),所以这里的brick就是OFPHandler,然后将将每种ev_cls的类型和app名字注册到该类中,其实本质上就是OFPHandler作为了一个消息源。这个函数非常重要,将每个app的handler与消息源OFPHandler建立了联系 ``` def lookup_service_brick(name): return SERVICE_BRICKS.get(name)
def _lookup_service_brick_by_mod_name(mod_name): return lookup_service_brick(mod_name.split(‘.’)[-1])
所以上面其实是返回SERVICE_BRICKS.get(‘ofp_event’),而对应的类为OFPHandler。也就是说,这里是根据事件所在的模块名加载服务块,brick = OFPHandler对象。
def register_observer(self, ev_cls, name, states=None):
states = states or set()
ev_cls_observers = self.observers.setdefault(ev_cls, {})
ev_cls_observers.setdefault(name, set()).update(states) ``` 上面程序细节比较多,先看参数,传入的en_cls是事件类,即订阅的事件,name为该类的类名,states为上文中提到的四中状态之一。构造observers字典,key是en_cls事件类,value是一个字典,过程如下: ``` >>> a.setdefault(4,{}) {} >>> a.setdefault(4,{}).setdefault('yuan',set()) set([]) >>> b = a.setdefault(4,{}) >>> b.setdefault('yuan',set()) set([]) >>> a {1: 'hello', 2: 'nihao', 3: 'world', 4: {'yuan': set([])}} >>> b.setdefault('yuan',set()).update('abc') >>> a {1: 'hello', 2: 'nihao', 3: 'world', 4: {'yuan': set(['a', 'c', 'b'])}} >>> ``` >总结来说,构造了一个较为复杂的observers字典,key是en_cls事件类,value为字典,其中key为类的名称,value为一个集合,其中保存的states。
整个函数_update_bricks函数对所有事件进行了注册,并生成了一个observers字典,注册在OFPHander类下,表示OFPHander为消息源。
接下来运行self.report_bricks(),如下
def report_bricks():
for brick, i in SERVICE_BRICKS.items():
AppManager._report_brick(brick, i)
def _report_brick(name, app):
LOG.debug("BRICK %s", name)
for ev_cls, list_ in app.observers.items():
LOG.debug(" PROVIDES %s TO %s", ev_cls.__name__, list_)
for ev_cls in app.event_handlers.keys():
LOG.debug(" CONSUMES %s", ev_cls.__name__)
这块代码主要负责显示信息。可以理解,brick代表服务链中的一个服务,也就是一个要运行的类,每个服务对应有两个字典,一是observers,另一个event_handlers,event_handlers是该类中保存的对应事件和触发函数。运行时会显示 CONSUMES
ev_cls.__name__
启动一个服务可能要依赖其他服务,比如ofp_event是所有APP都需要依赖的服务,被依赖的服务要提供相关服务,提供的服务就保存在observers中。
回到instantiate_apps函数,接下来,所有APP都启动。
threads = []
for app in self.applications.values():
t = app.start()
if t is not None:
app.set_main_thread(t)
threads.append(t)
return threads
def start(self):
"""
Hook that is called after startup initialization is done.
"""
self.threads.append(hub.spawn(self._event_loop))
def _event_loop(self):
while self.is_active or not self.events.empty():
ev, state = self.events.get()
self._events_sem.release()
if ev == self._event_stop:
continue
handlers = self.get_handlers(ev, state)
for handler in handlers:
try:
handler(ev)
except hub.TaskExit:
# Normal exit.
# Propagate upwards, so we leave the event loop.
raise
except:
LOG.exception('%s: Exception occurred during handler processing. '
'Backtrace from offending handler '
'[%s] servicing event [%s] follows.',
self.name, handler.__name__, ev.__class__.__name__)
hub.spawn(self._event_loop),利用eventlet框架创建一个协程,具体细节不深究。重点关注_event_loop函数。先搞清楚以下两个属性:
self.events = hub.Queue(128)
self._events_sem = hub.BoundedSemaphore(self.events.maxsize)
即调用hub文件中
Queue = eventlet.queue.LightQueue
BoundedSemaphore = eventlet.semaphore.BoundedSemaphore
查阅官网资料,
class eventlet.queue.LightQueue(maxsize=None) This is a variant of Queue that behaves mostly like the standard Stdlib_Queue. It differs by not supporting the task_done or joinmethods, and is a little faster for not having that overhead.
创建队列,和 Stdlib_Queue类似,但是性能相比较更好。
calss eventlet.semaphore.BoundedSemaphore(value=1) A bounded semaphore checks to make sure its current value doesn’t exceed its initial value. If it does, ValueError is raised. In most situations semaphores are used to guard resources with limited capacity. If the semaphore is released too many times it’s a sign of a bug. If not given, value defaults to 1. release(blocking=True) Release a semaphore, incrementing the internal counter by one. If the counter would exceed the initial value, raises ValueError. When it was zero on entry and another thread is waiting for it to become larger than zero again, wake up that thread
值得注意的是:
其中一个特殊的app是opf_handler app,其重写了start函数,其实调用start后就是启动OpenFlowController类,OpenFlowController启动后,ryu开始监听来自交换机的新连接
总结
整个RYU程序的启动可以分为两个部分,第一部分是APP的加载,上下文环境的加载,订阅事件的注册与分发。
第一部分主要分为以下几个步骤,在一些步骤中会有比较重要的数据结构需要注意,一开始有个数据结构要特别注意, set_ev_cls修饰的函数,表示订阅事件,会添加一个method, handler.callers[e] = _Caller(_listify(dispatchers), e.__module__)
。:
- 1、初始化APP管理类,AppManager
- 2、 生成APP服务列表(并不实例化),只是保存在对应字典中,
self.applications_cls[app_cls_name] = cls
,保存每个APP的名字和第一个功能类(满足是RyuApp子类等条件)- 3、加载每个app所依赖的模块,保存在
contexts_cls.setdefault(key, context_cls)
- 4、实例化依赖模块,即contexts_cls中的类。
- 5、实例化APPs。需要实例化的APP保存在applications_cls字典中。创建以下数据结构,一是SERVICE_BRICKS[app.name] = app表示服务链,二是所有订阅事件集合event_handlers,保存事件和触发函数的字典。三是observers字典,注册在OFPHander中,key是en_cls事件类,value为字典,其中key为类的名称,value为一个集合,其中保存的states。
- 6、运行每一个APP,并以协程方式管理。要注意的是 OFPHander重写了start函数,在该函数中启动了控制器类。下一篇文章中做深入分析。
参考学习资料
- SDN启动流程
- 命令解析库 OSLO http://lingxiankong.github.io/blog/2014/08/31/openstack-oslo-config/
- python修饰器
- 参考学习,站在别人的肩膀上
- RYU main函数
- RYU源码解读