77use FOS \ElasticaBundle \Persister \InPlacePagerPersister ;
88use FOS \ElasticaBundle \Persister \PagerPersisterRegistry ;
99use FOS \ElasticaBundle \Provider \PagerProviderRegistry ;
10- use Interop \Queue \PsrContext ;
11- use Interop \Queue \PsrMessage ;
12- use Interop \Queue \PsrProcessor ;
10+ use Interop \Queue \Context ;
11+ use Interop \Queue \Message ;
12+ use Interop \Queue \Processor ;
1313use Enqueue \Util \JSON ;
1414
15- final class PopulateProcessor implements PsrProcessor , CommandSubscriberInterface, QueueSubscriberInterface
15+ final class PopulateProcessor implements Processor , CommandSubscriberInterface, QueueSubscriberInterface
1616{
17- /**
18- * @var PagerProviderRegistry
19- */
2017 private $ pagerProviderRegistry ;
2118
22- /**
23- * @var PagerPersisterRegistry
24- */
2519 private $ pagerPersisterRegistry ;
2620
2721 public function __construct (
@@ -32,10 +26,7 @@ public function __construct(
3226 $ this ->pagerProviderRegistry = $ pagerProviderRegistry ;
3327 }
3428
35- /**
36- * {@inheritdoc}
37- */
38- public function process (PsrMessage $ message , PsrContext $ context )
29+ public function process (Message $ message , Context $ context ): Result
3930 {
4031 if ($ message ->isRedelivered ()) {
4132 $ replyMessage = $ this ->createReplyMessage ($ context , $ message , 0 ,'The message was redelivered. Chances are that something has gone wrong. ' );
@@ -81,14 +72,7 @@ public function process(PsrMessage $message, PsrContext $context)
8172 }
8273 }
8374
84- /**
85- * @param PsrContext $context
86- * @param PsrMessage $message
87- * @param int $objectsCount
88- * @param \Throwable $e
89- * @return PsrMessage
90- */
91- private function createExceptionReplyMessage (PsrContext $ context , PsrMessage $ message , $ objectsCount , \Throwable $ e )
75+ private function createExceptionReplyMessage (Context $ context , Message $ message , int $ objectsCount , \Throwable $ e ): Message
9276 {
9377 $ errorMessage = sprintf (
9478 '<error>The queue processor has failed to process the message with exception: </error><comment>%s: %s in file %s at line %s.</comment> ' ,
@@ -101,15 +85,7 @@ private function createExceptionReplyMessage(PsrContext $context, PsrMessage $me
10185 return $ this ->createReplyMessage ($ context , $ message , $ errorMessage );
10286 }
10387
104- /**
105- * @param PsrContext $context
106- * @param PsrMessage $message
107- * @param int $objectsCount
108- * @param string|null $error
109- *
110- * @return PsrMessage
111- */
112- private function createReplyMessage (PsrContext $ context , PsrMessage $ message , $ objectsCount , $ error = null )
88+ private function createReplyMessage (Context $ context , Message $ message , int $ objectsCount , string $ error = null ): Message
11389 {
11490 $ replyMessage = $ context ->createMessage ($ message ->getBody (), $ message ->getProperties (), $ message ->getHeaders ());
11591 $ replyMessage ->setProperty ('fos-populate-objects-count ' , $ objectsCount );
@@ -121,23 +97,17 @@ private function createReplyMessage(PsrContext $context, PsrMessage $message, $o
12197 return $ replyMessage ;
12298 }
12399
124- /**
125- * {@inheritdoc}
126- */
127- public static function getSubscribedCommand ()
100+ public static function getSubscribedCommand (): array
128101 {
129102 return [
130- 'processorName ' => Commands::POPULATE ,
131- 'queueName ' => Commands::POPULATE ,
132- 'queueNameHardcoded ' => true ,
103+ 'command ' => Commands::POPULATE ,
104+ 'queue ' => Commands::POPULATE ,
105+ 'prefix_queue ' => false ,
133106 'exclusive ' => true ,
134107 ];
135108 }
136109
137- /**
138- * {@inheritdoc}
139- */
140- public static function getSubscribedQueues ()
110+ public static function getSubscribedQueues (): array
141111 {
142112 return [Commands::POPULATE ];
143113 }
0 commit comments