|
12 | 12 | namespace Symfony\Component\Messenger\Tests\Transport\RedisExt; |
13 | 13 |
|
14 | 14 | use PHPUnit\Framework\TestCase; |
15 | | -use Symfony\Component\Messenger\Envelope; |
16 | 15 | use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; |
17 | 16 | use Symfony\Component\Messenger\Transport\RedisExt\Connection; |
18 | | -use Symfony\Component\Messenger\Transport\RedisExt\RedisReceiver; |
19 | | -use Symfony\Component\Messenger\Transport\RedisExt\RedisSender; |
20 | | -use Symfony\Component\Messenger\Transport\Serialization\Serializer; |
21 | | -use Symfony\Component\Process\PhpProcess; |
22 | | -use Symfony\Component\Process\Process; |
23 | | -use Symfony\Component\Serializer as SerializerComponent; |
24 | | -use Symfony\Component\Serializer\Encoder\JsonEncoder; |
25 | | -use Symfony\Component\Serializer\Normalizer\ObjectNormalizer; |
26 | 17 |
|
27 | 18 | /** |
28 | 19 | * @requires extension redis |
29 | 20 | */ |
30 | 21 | class RedisExtIntegrationTest extends TestCase |
31 | 22 | { |
| 23 | + private $redis; |
| 24 | + private $connection; |
| 25 | + |
32 | 26 | protected function setUp() |
33 | 27 | { |
34 | | - parent::setUp(); |
35 | | - |
36 | 28 | if (!getenv('MESSENGER_REDIS_DSN')) { |
37 | 29 | $this->markTestSkipped('The "MESSENGER_REDIS_DSN" environment variable is required.'); |
38 | 30 | } |
39 | | - } |
40 | | - |
41 | | - public function testItSendsAndReceivesMessages() |
42 | | - { |
43 | | - $serializer = new Serializer( |
44 | | - new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder())) |
45 | | - ); |
46 | | - |
47 | | - $connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN')); |
48 | | - |
49 | | - $sender = new RedisSender($connection, $serializer); |
50 | | - $receiver = new RedisReceiver($connection, $serializer); |
51 | 31 |
|
52 | | - $sender->send($first = Envelope::wrap(new DummyMessage('First'))); |
53 | | - $sender->send($second = Envelope::wrap(new DummyMessage('Second'))); |
54 | | - |
55 | | - $receivedMessages = 0; |
56 | | - $receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages, $first, $second) { |
57 | | - $this->assertEquals(0 == $receivedMessages ? $first : $second, $envelope); |
58 | | - |
59 | | - if (2 === ++$receivedMessages) { |
60 | | - $receiver->stop(); |
61 | | - } |
62 | | - }); |
| 32 | + $this->redis = new \Redis(); |
| 33 | + $this->connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), [], $this->redis); |
| 34 | + $this->clearRedis(); |
| 35 | + $this->connection->setup(); |
63 | 36 | } |
64 | 37 |
|
65 | | - public function testItReceivesSignals() |
| 38 | + public function testConnectionSendAndGet() |
66 | 39 | { |
67 | | - $serializer = new Serializer( |
68 | | - new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder())) |
69 | | - ); |
70 | | - |
71 | | - $connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN')); |
72 | | - |
73 | | - $sender = new RedisSender($connection, $serializer); |
74 | | - $sender->send(Envelope::wrap(new DummyMessage('Hello'))); |
75 | | - |
76 | | - $amqpReadTimeout = 30; |
77 | | - $dsn = getenv('MESSENGER_REDIS_DSN').'?read_timeout='.$amqpReadTimeout; |
78 | | - $process = new PhpProcess(file_get_contents(__DIR__.'/Fixtures/long_receiver.php'), null, array( |
79 | | - 'COMPONENT_ROOT' => __DIR__.'/../../../', |
80 | | - 'DSN' => $dsn, |
81 | | - )); |
82 | | - |
83 | | - $process->start(); |
84 | | - |
85 | | - $this->waitForOutput($process, $expectedOutput = "Receiving messages...\n"); |
86 | | - |
87 | | - $signalTime = microtime(true); |
88 | | - $timedOutTime = time() + 10; |
89 | | - |
90 | | - $process->signal(15); |
91 | | - |
92 | | - while ($process->isRunning() && time() < $timedOutTime) { |
93 | | - usleep(100 * 1000); // 100ms |
94 | | - } |
95 | | - |
96 | | - $this->assertFalse($process->isRunning()); |
97 | | - $this->assertLessThan($amqpReadTimeout, microtime(true) - $signalTime); |
98 | | - $this->assertSame($expectedOutput.<<<'TXT' |
99 | | -Get envelope with message: Symfony\Component\Messenger\Tests\Fixtures\DummyMessage |
100 | | -with items: [ |
101 | | - "Symfony\\Component\\Messenger\\Asynchronous\\Transport\\ReceivedMessage" |
102 | | -] |
103 | | -Done. |
104 | | - |
105 | | -TXT |
106 | | - , $process->getOutput()); |
| 40 | + $this->connection->add('{"message": "Hi"}', ['type' => DummyMessage::class]); |
| 41 | + $encoded = $this->connection->get(); |
| 42 | + $this->assertEquals('{"message": "Hi"}', $encoded['body']); |
| 43 | + $this->assertEquals(['type' => DummyMessage::class], $encoded['headers']); |
107 | 44 | } |
108 | 45 |
|
109 | | - /** |
110 | | - * @runInSeparateProcess |
111 | | - */ |
112 | | - public function testItSupportsTimeoutAndTicksNullMessagesToTheHandler() |
| 46 | + public function testGetTheFirstAvailableMessage() |
113 | 47 | { |
114 | | - $serializer = new Serializer( |
115 | | - new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder())) |
116 | | - ); |
117 | | - |
118 | | - $connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), array('blocking_timeout' => '1')); |
119 | | - |
120 | | - $receiver = new RedisReceiver($connection, $serializer); |
121 | | - |
122 | | - $receivedMessages = 0; |
123 | | - $receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages) { |
124 | | - $this->assertNull($envelope); |
125 | | - |
126 | | - if (2 === ++$receivedMessages) { |
127 | | - $receiver->stop(); |
128 | | - } |
129 | | - }); |
| 48 | + $this->connection->add('{"message": "Hi1"}', ['type' => DummyMessage::class]); |
| 49 | + $this->connection->add('{"message": "Hi2"}', ['type' => DummyMessage::class]); |
| 50 | + $encoded = $this->connection->get(); |
| 51 | + $this->assertEquals('{"message": "Hi1"}', $encoded['body']); |
| 52 | + $this->assertEquals(['type' => DummyMessage::class], $encoded['headers']); |
| 53 | + $encoded = $this->connection->get(); |
| 54 | + $this->assertEquals('{"message": "Hi2"}', $encoded['body']); |
| 55 | + $this->assertEquals(['type' => DummyMessage::class], $encoded['headers']); |
130 | 56 | } |
131 | 57 |
|
132 | | - private function waitForOutput(Process $process, string $output, $timeoutInSeconds = 10) |
| 58 | + private function clearRedis() |
133 | 59 | { |
134 | | - $timedOutTime = time() + $timeoutInSeconds; |
135 | | - |
136 | | - while (time() < $timedOutTime) { |
137 | | - if (0 === strpos($process->getOutput(), $output)) { |
138 | | - return; |
139 | | - } |
140 | | - |
141 | | - usleep(100 * 1000); // 100ms |
142 | | - } |
143 | | - |
144 | | - throw new \RuntimeException('Expected output never arrived. Got "'.$process->getOutput().'" instead.'); |
| 60 | + $parsedUrl = parse_url(getenv('MESSENGER_REDIS_DSN')); |
| 61 | + $pathParts = explode('/', $parsedUrl['path'] ?? ''); |
| 62 | + $stream = $pathParts[1] ?? 'symfony'; |
| 63 | + $this->redis->del($stream); |
145 | 64 | } |
146 | 65 | } |
0 commit comments