Spaces:
Sleeping
Sleeping
| from time import sleep | |
| import uuid | |
| import pytest | |
| from multiprocessing import Pool | |
| from unittest.mock import Mock, patch | |
| from threading import Thread | |
| from ding.utils import WatchDog | |
| from ding.framework.message_queue.redis import RedisMQ | |
| def redis_main(i): | |
| node_id0 = uuid.uuid4().hex.encode() | |
| class MockRedis(Mock): | |
| def publish(self, topic, data): | |
| assert topic == "t" | |
| assert b"::" in data | |
| def pubsub(self): | |
| return MockPubSub() | |
| class MockPubSub(Mock): | |
| def get_message(self, **kwargs): | |
| return {"channel": b"t", "data": node_id0 + b"::data"} | |
| with patch("redis.Redis", MockRedis): | |
| host = "127.0.0.1" | |
| port = 6379 | |
| mq = RedisMQ(redis_host=host, redis_port=port) | |
| mq.listen() | |
| if i == 0: | |
| mq._id = node_id0 | |
| def send_message(): | |
| for _ in range(5): | |
| mq.publish("t", b"data") | |
| sleep(0.1) | |
| def recv_message(): | |
| # Should not receive any message | |
| mq.subscribe("t") | |
| print("RECV", mq.recv()) | |
| send_thread = Thread(target=send_message, daemon=True) | |
| recv_thread = Thread(target=recv_message, daemon=True) | |
| send_thread.start() | |
| recv_thread.start() | |
| send_thread.join() | |
| watchdog = WatchDog(1) | |
| with pytest.raises(TimeoutError): | |
| watchdog.start() | |
| recv_thread.join() | |
| watchdog.stop() | |
| else: | |
| mq.subscribe("t") | |
| topic, msg = mq.recv() | |
| assert topic == "t" | |
| assert msg == b"data" | |
| def test_redis(): | |
| with Pool(processes=2) as pool: | |
| pool.map(redis_main, range(2)) | |