Source code for finchan.dispatcher

# -*- coding: utf-8 -*-
# This file is part of finchan.

# Copyright (C) 2017-present qytz <hhhhhf@foxmail.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Event Dispatcher

get events from event source and dispatch evnets to the subscriber.
"""
import asyncio
import functools
import logging
import signal
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from datetime import datetime, timedelta

logger = logging.getLogger(__name__)

def task_done(task):
    # logger.info("task %s is done: %s", task, task.result())
    try:
        task.result()
    except asyncio.CancelledError:
        logger.error("task[%s] has been canceled", task.task_name)
    except Exception as e:
        logger.error("task[%s] raise an exception:%s", task.task_name, e)


class Dispatcher:
    pass


class LiveTrackDispatcher(Dispatcher):
    """
    Dispatcher that dispatch real time events.
    the event sources generate events and put them into event queue,
    the main thread get events from event queue and dispatch them.

    :param env: global environment
    """

    def __init__(self, env):
        self.env = env

        conf = env.options.get("dispatcher")
        self._trace_process_time = conf.get("trace_process_time", False)
        self._start_dt = conf.get("start_dt", datetime.now())
        self._end_dt = conf.get("end_dt", datetime.strptime("9999-12-31", "%Y-%m-%d"))
        self._limit_time = conf.get("limit_time", True)

        self._thread_executor = None
        self._process_executor = None
        self._max_thread_workers = conf.get("thread_workers", None)

        self._main_task = None
        self._quit_event = None

    @property
    def now(self):
        """datetime of current logic time"""
        return datetime.now()

    def quit_handler(self, signame):
        """quit signal handler, callbacks for signal: ctrl_c/ctrl_\\."""
        self._quit_event.set()

    def register_signals(self, loop=None):
        """register signals for the loop"""
        loop = loop or asyncio.get_running_loop()
        if not loop:
            return False
        # for signame in {"SIGINT", "SIGTERM"}:
        for signame in {"SIGINT"}:
            loop.add_signal_handler(getattr(signal, signame), functools.partial(self.quit_handler, signame))

    @staticmethod
    def schedule_task(cor):
        """schedule the coroutine to run"""
        try:
            task = asyncio.create_task(cor)
        except AttributeError:
            task = asyncio.ensure_future(cor)
        task.task_name = repr(cor)
        task.add_done_callback(task_done)
        return task

    @staticmethod
    def all_tasks():
        """schedule the coroutine to run"""
        try:  # Python <3.7 has no the method
            return asyncio.all_tasks()
        except AttributeError:
            return asyncio.Task.all_tasks()

    @staticmethod
    def current_task():
        """get current active task"""
        try:  # Python <3.7 has no the method
            return asyncio.current_task()
        except AttributeError:
            return asyncio.Task.current_task()

    # --- async funcs ---
    async def quit_checker(self)->None:
        while True:
            await self.sleep(1)
            if len(self.all_tasks()) == 2:
                logger.debug("only one task")
                break
            if self.now >= self._end_dt:
                logger.debug("reach endtime")
                break
        self._quit_event.set()

    async def quit(self) -> None:
        """stop the dispatcher, do the cleanup stuffs"""
        await self._quit_event.wait()

        # system is exiting
        logger.info("#Dispatcher stop running.")
        # call extensions' cleanup
        await self.env.ext_manager.cleanup()

        # shutdown all executors
        if self._thread_executor:
            self._thread_executor.shutdown(wait=True)
        if self._process_executor:
            self._process_executor.shutdown(wait=True)

        # cancel all tasks
        for task in self.all_tasks():
            if task == asyncio.current_task() or task == self._main_task:
                continue
            task.cancel()

        for task in self.all_tasks():
            # no need to await current task
            if task == asyncio.current_task() or task == self._main_task:
                continue
            try:
                await task
            except asyncio.CancelledError:
                pass

        # loop = asyncio.get_running_loop()
        # if loop:
        #     loop.stop()

    async def run_in_thread(self, func, run_mode="thread", *args, **kwargs):
        """run synchronous func in the executor"""
        if not self._thread_executor:
            self._thread_executor = ThreadPoolExecutor(max_workers=self._max_thread_workers)
        loop = asyncio.get_running_loop()
        await loop.run_in_executor(self._thread_executor, functools.partial(func, *args, **kwargs))

    async def run_in_process(self, func, *args, **kwargs):
        if not self._process_executor:
            self._process_executor = ProcessPoolExecutor()
        loop = asyncio.get_running_loop()
        await loop.run_in_executor(self._process_executor, functools.partial(func, *args, **kwargs))

    async def sleep(self, delay: float) -> None:
        return await asyncio.sleep(delay)

    async def run(self):
        """dispatch the event loop to run."""
        self._main_task = self.current_task()
        self._quit_event = asyncio.Event()
        self.register_signals(asyncio.get_running_loop())
        self.schedule_task(self.quit_checker())
        self.schedule_task(self.env.ext_manager.setup())
        await self.quit()


[docs]class BackTrackDispatcher(LiveTrackDispatcher): """ Dispatcher that dispatch backtrack events. the event sources generate events and put them into event queue, the main thread get events from event queue and dispatch them. :param env: global envirument """ def __init__(self, env): super().__init__(env) self.sleep_queue = [] self._now = self._start_dt self._need_forword = self._now < datetime.now() @property def now(self): """datetime of current logic time""" if self._limit_time: if not self._need_forword: return datetime.now() if self._now >= datetime.now(): self._need_forword = False return datetime.now() else: return self._now else: return self._now async def sleep(self, delay: float) -> None: self.sleep_queue.append(delay) self.sleep_queue.sort() while True: while len(self.all_tasks()) > 2: logger.debug("all_tasks:%s", self.all_tasks()) await asyncio.sleep(0.1) try: elasped = self.sleep_queue.pop(0) except IndexError: return logger.info("elasped:%s delay:%s", elasped, delay) self._now += timedelta(seconds=elasped) if elasped == delay: return async def get_events(self): pass async def foward(self): pass
[docs] async def run(self): """dispatch the event loop to run.""" self._main_task = self.current_task() self._quit_event = asyncio.Event() self.register_signals(asyncio.get_running_loop()) self.schedule_task(self.quit_checker()) self.schedule_task(self.env.ext_manager.setup()) while True: pass await self.quit()
def get_dispatcher(env): if env.run_mode == "backtrack": return BackTrackDispatcher(env) return LiveTrackDispatcher(env)