neutron web server启动过程

Posted by dodoYuan on February 15, 2018

函数入口为:

cmd.server中的main函数

def main():
    server.boot_server(wsgi_eventlet.eventlet_wsgi_server)

调用的函数原型为:

def eventlet_wsgi_server():
    neutron_api = service.serve_wsgi(service.NeutronApiService)
    start_api_and_rpc_workers(neutron_api)

第一句启动了一个web server;第二句就是创建一个RPC server。先分析web server启动过程。

1、web server启动过程

serve_wsgi函数传入参数NeutronApiService,NeutronApiService是一个类,该类继承自WsgiService。

class NeutronApiService(WsgiService):
    """Class for neutron-api service."""
    def __init__(self, app_name):
        profiler.setup('neutron-server', cfg.CONF.host)
        super(NeutronApiService, self).__init__(app_name)

    @classmethod
    def create(cls, app_name='neutron'):
        # Setup logging early
        config.setup_logging()
        service = cls(app_name)
        return service

WsgiService是所有neutron api server的父类,里面定义了start等重要方法。

class WsgiService(object):
    """Base class for WSGI based services.

    For each api you define, you must also define these flags:
    :<api>_listen: The address on which to listen
    :<api>_listen_port: The port on which to listen

    """

    def __init__(self, app_name):
        self.app_name = app_name
        self.wsgi_app = None

    def start(self):
        self.wsgi_app = _run_wsgi(self.app_name)

    def wait(self):
        self.wsgi_app.wait()

上面分析了service.serve_wsgi(service.NeutronApiService)中传入的函数基本实现,接下来看看serve_wsgi函数,serve_wsgi调用NeutronApiService的create类方法,并启动。

def serve_wsgi(cls):

    try:
        service = cls.create()
        service.start()
    except Exception:
        with excutils.save_and_reraise_exception():
            LOG.exception('Unrecoverable error: please check log '
                          'for details.')

    registry.publish(resources.PROCESS, events.BEFORE_SPAWN, service)
    return service

所以最终定为到service.start()函数,返回WsgiService类,发现关键代码为:

def _run_wsgi(app_name):
    app = config.load_paste_app(app_name)
    if not app:
        LOG.error('No known API applications configured.')
        return
    return run_wsgi_app(app)

百转千回,发现最终启动APP的是run_wsgi_app(app)函数,这里需要先加载APP(APP的名字使用默认neutron)。 load_paste_app函数在common.config.py文件中

def load_paste_app(app_name):
    """Builds and returns a WSGI app from a paste config file.

    :param app_name: Name of the application to load
    """
    loader = wsgi.Loader(cfg.CONF)
    app = loader.load_app(app_name)
    return app

neutron源码中不支持from oslo_service import wsgi ,在OSLO项目中,找到程序如下:

class Loader(object):
    """Used to load WSGI applications from paste configurations."""

    def __init__(self, conf):
        """Initialize the loader, and attempt to find the config.
        :param conf: Application config
        :returns: None
        """
        conf.register_opts(_options.wsgi_opts)
        self.config_path = None

        config_path = conf.api_paste_config
        if not os.path.isabs(config_path):
            self.config_path = conf.find_file(config_path)
        elif os.path.exists(config_path):
            self.config_path = config_path

        if not self.config_path:
            raise ConfigNotFound(path=config_path)

    def load_app(self, name):
        """Return the paste URLMap wrapped WSGI application.
        :param name: Name of the application to load.
        :returns: Paste URLMap object wrapping the requested application.
        :raises: PasteAppNotFound
        """
        try:
            LOG.debug("Loading app %(name)s from %(path)s",
                      {'name': name, 'path': self.config_path})
            return deploy.loadapp("config:%s" % self.config_path, name=name)
        except LookupError:
            LOG.exception("Couldn't lookup app: %s", name)
            raise PasteAppNotFound(name=name, path=self.config_path)

上面程序主要用来加载一个 WSGI app,最终落实的函数是paste.deploy.loadapp,不在详细介绍。 接下来是run_wsgi_app函数,启动加载好的APP

def run_wsgi_app(app):
    server = wsgi.Server("Neutron")
    server.start(app, cfg.CONF.bind_port, cfg.CONF.bind_host,
                 workers=_get_api_workers())
    LOG.info("Neutron service started, listening on %(host)s:%(port)s",
             {'host': cfg.CONF.bind_host, 'port': cfg.CONF.bind_port})
    return server

思路很清楚,先加载一个Server实例,然后传入该APP并启动。其中Server函数为

class Server(object):
    """Server class to manage multiple WSGI sockets and applications."""

    def __init__(self, name, num_threads=None, disable_ssl=False):
        # Raise the default from 8192 to accommodate large tokens
        eventlet.wsgi.MAX_HEADER_LINE = CONF.max_header_line
        self.num_threads = num_threads or CONF.wsgi_default_pool_size
        self.disable_ssl = disable_ssl
        # Pool for a greenthread in which wsgi server will be running
        self.pool = eventlet.GreenPool(1)
        self.name = name
        self._server = None
        # A value of 0 is converted to None because None is what causes the
        # wsgi server to wait forever.
        self.client_socket_timeout = CONF.client_socket_timeout or None
        if CONF.use_ssl and not self.disable_ssl:
            sslutils.is_enabled(CONF)

    def start(self, application, port, host='0.0.0.0', workers=0):
        """Run a WSGI server with the given application."""
        self._host = host
        self._port = port
        backlog = CONF.backlog

        self._socket = self._get_socket(self._host,
                                        self._port,
                                        backlog=backlog)

        self._launch(application, workers)

上面函数用来启动一个web server,在port和host上监听,

    def _launch(self, application, workers=0):
        service = WorkerService(self, application, self.disable_ssl, workers)
        if workers < 1:
            # The API service should run in the current process.
            self._server = service
            # Dump the initial option values
            cfg.CONF.log_opt_values(LOG, logging.DEBUG)
            service.start()
            systemd.notify_once()
        else:
            # dispose the whole pool before os.fork, otherwise there will
            # be shared DB connections in child processes which may cause
            # DB errors.
            api.context_manager.dispose_pool()
            # The API service runs in a number of child processes.
            # Minimize the cost of checking for child exit by extending the
            # wait interval past the default of 0.01s.
            self._server = common_service.ProcessLauncher(
                cfg.CONF, wait_interval=1.0, restart_method='mutate')
            self._server.launch_service(service,
                                        workers=service.worker_process_count)

_launch函数又初始化了WorkerService类,传入了server自身和application实例。并且根据workers来决定是在当前进程还是另外启动新进程来运行web server

class WorkerService(neutron_worker.BaseWorker):
    """Wraps a worker to be handled by ProcessLauncher"""
    def __init__(self, service, application, disable_ssl=False,
                 worker_process_count=0):
        super(WorkerService, self).__init__(worker_process_count)

        self._service = service
        self._application = application
        self._disable_ssl = disable_ssl
        self._server = None

    def start(self):
        super(WorkerService, self).start()
        # When api worker is stopped it kills the eventlet wsgi server which
        # internally closes the wsgi server socket object. This server socket
        # object becomes not usable which leads to "Bad file descriptor"
        # errors on service restart.
        # Duplicate a socket object to keep a file descriptor usable.
        dup_sock = self._service._socket.dup()
        if CONF.use_ssl and not self._disable_ssl:
            dup_sock = sslutils.wrap(CONF, dup_sock)
        self._server = self._service.pool.spawn(self._service._run,
                                                self._application,
                                                dup_sock)

代码有点乱,梳理一下,在 run_wsgi_app函数中,实例化了wsgi.Server(“Neutron”),调用该函数的start,传入socket相关的参数和workers数,该start函数完成的功能有两个,建立一个socket服务端监听,并调用_launch(application, workers),而_launch函数又实例化service = WorkerService(self, application, self.disable_ssl, workers),并根据workers数目确定在当前进程启动还是在新增加进程启动,产生了一个分支。所以说到现在还没有启动web server。 在当前进程启动的话直接调用WorkerService的start函数,通过pool.spawn生成了一个新的协程,用来运行如下函数:

    def _run(self, application, socket):
        """Start a WSGI server in a new green thread."""
        eventlet.wsgi.server(socket, application,
                             max_size=self.num_threads,
                             log=LOG,
                             keepalive=CONF.wsgi_keep_alive,
                             log_format=CONF.wsgi_log_format,
                             socket_timeout=self.client_socket_timeout)

在新进程中启动web server方式不做多介绍。