热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

集成测试多个芹菜工人和数据库支持的DjangoAPI-IntegrationTestingMultipleCeleryWorkersandaDBBackedDjangoAPI

ImworkingwithaSoftwareOrientedArchitecturethathasmultipleceleryworkers(letscallthem

I'm working with a Software Oriented Architecture that has multiple celery workers (let's call them worker1, worker2, and worker3). All three workers are separate entities (i.e., separate code bases, separate repos, separate celery instances, separate machines) and none of them are connected to a Django app.

我正在使用一个面向软件的架构,它有多个芹菜工人(我们称之为worker1,worker2和worker3)。所有三个工作者都是独立的实体(即,单独的代码库,单独的存储库,单独的芹菜实例,单独的机器),并且它们都没有连接到Django应用程序。

Communicating with each of these three workers is a Django based, MySQL backed RESTful API.

与这三个工作者中的每一个进行通信是基于Django的MySQL支持的RESTful API。

In development, these services are all on a vagrant box, each acting as a separate machine running off of a separate port. We have a single RabbitMQ broker for all of the Celery tasks.

在开发过程中,这些服务都在一个流浪盒上,每个服务器都作为一个单独的端口运行的独立机器。我们为所有Celery任务提供了一个RabbitMQ代理。

A typical path through these services might look something like this: worker1 gets a message from a device, does some processing, queues up a task on worker2, which does further processing and makes a POST to the API, which writes to the MySQL DB and triggers a task on worker3, which does some other processing and makes another POST to the API which results in a MySQL write.

通过这些服务的典型路径可能如下所示:worker1从设备获取消息,进行一些处理,在worker2上排队任务,进行进一步处理并对API进行POST,写入MySQL DB和在worker3上触发一个任务,它执行一些其他处理并对API进行另一个POST,从而导致MySQL写入。

The services are communicating nicely, but it's very annoying to test this flow every time we make a change to any service. I really want to get some full integration tests (i.e., starting at a message sent to worker1 and going through the entire chain) in place but I'm not sure where to start. The main problems I'm facing are these:

服务很好地沟通,但每次我们对任何服务进行更改时测试此流程都非常烦人。我真的想要进行一些完整的集成测试(即从发送给worker1并通过整个链的消息开始),但我不知道从哪里开始。我面临的主要问题是:

If I queue up something on worker1, how can I possibly tell when the whole flow is over? How can I make reasonable assertions about results when I don't know if the results have even arrived?

如果我在worker1上排队,我怎么能分辨出整个流程何时结束?当我不知道结果是否已经到达时,如何对结果做出合理的断言呢?

How do I deal with DB set up/tear down? I want to delete all of the entries made during a test at the end of each test, but if I'm starting the test from outside of the Django app, I'm not sure how to efficiently clear it out. Manually deleting it and recreating it after every test seems like it might be too much overhead.

如何处理数据库设置/拆除?我想在每次测试结束时删除测试期间所有条目,但如果我从Django应用程序外部开始测试,我不确定如何有效地清除它。手动删除它并在每次测试后重新创建它似乎可能是太多的开销。

2 个解决方案

#1


3  

Celery allows to run task synchronously, so the first step is: Divide whole flow into separate tasks, fake requests and assert results:

Celery允许同步运行任务,因此第一步是:将整个流程划分为单独的任务,伪造请求和断言结果:

Original flow:

原始流程:

device --- worker1 --- worker2 --- django --- worker3 --- django

First-level integration tests:

一级集成测试:

1.      |- worker1 -|
2.                  |- worker2 -|
3.                              |- django -|
4.                                         |- worker3 -|
5.                                                     |- django -|

For each test create fake request or synchronous call and assert results. Place these tests in corresponding repository. For example in test for worker1, you can mock worker2 and test that it has been called with proper arguments. Then, in another test, you will call worker2 and mock request to check, that it calls right API. And so on.

对于每个测试,创建假请求或同步调用并断言结果。将这些测试放在相应的存储库中例如,在test1的测试中,您可以模拟worker2并测试它是否已使用适当的参数调用。然后,在另一个测试中,您将调用worker2和模拟请求来检查它是否正确调用API。等等。

Testing whole flow, will be difficult, since all tasks are separate entities. The only way I've came up with now is to make one fake call to worker1, set reasonable timeout and wait for final result in database. This kind test only tells you if it works or not. It won't show you, where is the problem.

测试整个流程将很困难,因为所有任务都是独立的实体。我现在想出的唯一方法就是对worker1进行一次虚假调用,设置合理的超时并等待数据库中的最终结果。此类测试仅告诉您它是否有效。它不会告诉你,问题出在哪里。

#2


0  

To work with the full setup, you can set up a Celery results backend. See the Celery 'next steps' documentation for the basics.

要使用完整设置,您可以设置Celery结果后端。有关基础知识,请参阅Celery的“后续步骤”文档。

worker1 could then report the task handle of what it has passed on to worker2. The result returned by worker2 would be the task id of what it has passed on to worker3. And the result returned by worker3 would mean the whole sequence is finished and you can check the outcomes. The results could also already report interesting bits of those outcomes right away to make the checking easier.

然后,worker1可以报告传递给worker2的任务句柄。 worker2返回的结果将是传递给worker3的任务ID。 worker3返回的结果意味着整个序列已经完成,您可以检查结果。结果也可以立即报告这些结果的有趣部分,以使检查更容易。

This might look somewhat like this in Celery:

这可能在Celery中看起来像这样:

worker1_result = mytask.delay(someargs)  # executed by worker1
worker2_result = worker1_result.get()  # waits for worker1 to finish
worker3_result = worker2_result.get()  # waits for worker2 to finish
outcome = worker3_result.get()  # waits for worker3 to finish

(The details probably need to be different; I have not used this myself yet. I am not sure whether task results are serializable and hence themselves suitable as task function return values.)

(细节可能需要不同;我自己还没有使用过。我不确定任务结果是否可序列化,因此它们本身适合作为任务函数返回值。)


推荐阅读
author-avatar
carefulff
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有