# OneCat TaskQueue **Repository Path**: deali/OneCat-TaskQueue ## Basic Information - **Project Name**: OneCat TaskQueue - **Description**: No description available - **Primary Language**: Python - **License**: GPL-3.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2020-07-22 - **Last Updated**: 2020-12-19 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README 又是造轮子系列咯,Python上有很多成熟完善的异步任务队列框架可以用,比如Celery,或者RQ,不过这些都不自带消息队列服务,都需要使用Redis、RabbitMQ之类的消息队列才行,我用到小项目中又不需要附带这么多东西,于是自己动手来实现咯。 ## 思路 1. 将需要异步执行的任务添加到队列 2. 自动从队列中取出任务,创建新线程执行 3. 保存任务的执行结果和输出 4. 任务完成,调用回调函数,处理返回的数据 5. 使用输出重定向处理任务的输出 ## Demo 附上使用任务队列的demo代码: ```python if __name__ == '__main__': task_queue = TaskQueue(output_redirect=True) def fun1(): time.sleep(2) return 1 def fun2(): time.sleep(3) return 2 task_queue.put(Task( func=fun1, callback=lambda task, result: print(f'task1 result: {result}'), )) task_queue.put(Task( func=fun2, callback=lambda task, result: print(f'task2 result: {result}'), )) def custom_output(msg): print(f'[custom] {msg}') def fun3(num1, num2): print(f'num1={num1}') print(f'num2={num2}') return num1 + num2 def callback(task_obj, result): print(f'task3 result={result}') output = task_obj.outputs output.custom = custom_output output.reset() output.to_custom() task_queue.put(Task(func=fun3, callback=callback, args=[2, 3])) print('task queue run') task_queue.run() print('do other things...') for i in range(0, 100): print(i * i) ``` ## wheel构建 ```bash python3.6 setup.py sdist bdist_wheel twine upload dist/* -u "用户名" -p "密码" ``` 下面分析一下几个关键部分的代码实现 ## 优先级队列 Python标准库已经有实现好的优先级队列了,但是当在相同优先级下传入同样的对象时,他会自动比较这些对象,不过我们的Task类没有实现相关的运算符重载,所以无法比较。 要解决这个问题,要不就在Task里面实现运算符重载,要不就是自己实现一个优先级队列,我选择后者。 ```python class PriorityQueue: """ 自己实现的优先级队列,使用了Python的堆 """ def __init__(self): self._index = 0 self.queue = [] def push(self, priority, val): heapq.heappush(self.queue, (priority, self._index, val)) self._index += 1 def pop(self): return heapq.heappop(self.queue)[-1] @property def empty(self): return len(self.queue) == 0 ``` ## 输出重定向 其实这个挺坑的了,毕竟是异步任务队列,难免涉及到线程的竞争问题,又没办法单独控制每个任务的输出,不过我还是做了,小项目的话还是可以用的(其实就是懒得移植一下已有代码) 代码如下,同样是使用队列来实现输出缓冲区,大小可以自己调整,默认支持输出到控制台、文件或者返回列表。 一般重定向输出的话需要自己实现可以输出处理函数,直接给Redirection类的custom 属性赋值即可。 ```python class Redirection: def __init__(self, buffer_size=512): self.buffer = Queue(maxsize=512) self._console = sys.stdout # 自定义的输出端 self.custom = None def write(self, output_stream): # 加入缓冲区队列 self.buffer.put(output_stream) def to_console(self): sys.stdout = self._console # 出列 while not self.buffer.empty(): print(self.buffer.get()) def to_file(self, file_path): with open(file_path, 'w+') as f: sys.stdout = f while not self.buffer.empty(): print(self.buffer.get()) f.close() def to_custom(self): while not self.buffer.empty(): self.custom(self.buffer.get()) def to_list(self): data = [] while not self.buffer.empty(): data.append(self.buffer.get()) return data def flush(self): self.buffer.empty() def reset(self): sys.stdout = self._console ``` ## Task类 没什么好说的,定义任务类,Task,代码如下: ```python class Task: def __init__(self, func, callback=None, priority=Priority.MIDDLE, args=(), kwargs={}): """ Args: func: 需要执行的函数 callback: 执行完的回调函数 priority: 优先级 *args: **kwargs: """ self._id = uuid.uuid4().hex self.function = func self.callback = callback self.priority = priority self.args = args self.kwargs = kwargs # 任务运行过程的输出,stdout的输出 self._outputs: Redirection = None @property def id(self): return self._id @property def outputs(self) -> Redirection: return self._outputs @outputs.setter def outputs(self, value: Redirection): self._outputs = value def run(self): try: if self.callback: # 回调函数原型 callback(task_obj, result) result = self.callback(self, self.function(*self.args, **self.kwargs)) else: result = self.function(*self.args, **self.kwargs) return result except Exception as e: if self.callback: result = self.callback(self, e) else: result = e return result ``` 使用起来很简单,如下: ```python def fun1(num1, num2): print(f'num1={num1}') print(f'num2={num2}') return num1 + num2 Task( func=fun1, callback=lambda task, result: print(f'task result: {result}'), ) # 也可以写成这样 Task( func=lambda num1, num2: num1 + num2, callback=lambda task, result: print(f'task result: {result}'), args=[2, 3] ) ``` ## 任务队列 最后是实现任务队列,也很简单,根据任务的优先级进行调度即可。 ```python class TaskQueue: """ 基于线程的异步任务队列 todo 下次做一个基于进程的队列,充分利用多核CPU性能 """ def __init__(self, output_redirect=False): self.queue = PriorityQueue() self.output_redirect = output_redirect self._redirect_objs = {} self._results = {} def put(self, task: Task): """ 将task加入任务列表 Args: task: Returns:返回task id """ self.queue.push(task.priority, task) return task.id def get(self): return self.queue.pop() def run(self): while not self.queue.empty: task = self.get() # 开启新线程 t = threading.Thread(target=self._task_wrapper, name=task.id, args=[task]) self._log(f'Start thread {task.id}') t.start() def get_output(self, task_id: str) -> Redirection: return self._redirect_objs.get(task_id, None) def get_result(self, task_id: str): return self._results.get(task_id, None) @staticmethod def _log(msg: str): """日志输出接口,可以替换为日志组件""" print(f'[TaskQueue] {msg}') def _task_wrapper(self, task: Task): if self.output_redirect: if task.id in self._redirect_objs: redirect_obj = self._redirect_objs[task.id] else: redirect_obj = Redirection(2048) self._redirect_objs[task.id] = redirect_obj # 重定向输出 sys.stdout = redirect_obj task.outputs = redirect_obj result = task.run() # 恢复默认输出 redirect_obj.reset() self._log(f'Task finished. {task.id}') else: result = task.run() # 保存结果 self._results[task.id] = result ``` 整个项目代码就在这了,很简单,也有很多不足的地方,不过小项目用用勉强还可以吧~ 过几天发布到pip,有需要的同学直接pip安装就可以使用,请关注博客更新。 ## 参考资料 - 运算符重载 [https://blog.csdn.net/goodlixueyong/article/details/52589979](https://blog.csdn.net/goodlixueyong/article/details/52589979) - [https://www.open-open.com/code/view/1433410658322](https://www.open-open.com/code/view/1433410658322) - Celery异步任务队列 [https://www.cnblogs.com/StitchSun/p/8552488.html](https://www.cnblogs.com/StitchSun/p/8552488.html) - https://zhuanlan.zhihu.com/p/37637660 ## 欢迎交流 我整理了一系列的技术文章和资料,在公众号「程序设计实验室」后台回复 linux、flutter、c#、netcore、android、kotlin、java、python 等可获取相关技术文章和资料,同时有任何问题都可以在公众号后台留言~