66use React \EventLoop \LoopInterface ;
77use RuntimeException ;
88use Rx \Observable ;
9- use function Rx \p ;
109use Rx \Scheduler ;
1110use Rx \Subject \Subject ;
1211use Rx \Websocket \WebsocketErrorException ;
1312use Throwable ;
13+ use function Rx \p ;
1414
1515final class AsyncClient
1616{
@@ -38,19 +38,32 @@ final class AsyncClient
3838 */
3939 private $ client ;
4040
41+ /**
42+ * @var string|null
43+ */
44+ private $ authEndpoint ;
45+
46+ /**
47+ * @var array|null
48+ */
49+ private $ authEndpointHeaders ;
50+
4151 /**
4252 * @var Observable
4353 */
4454 private $ connected ;
4555
4656 /**
47- * @internal
48- * @param Subject $client
57+ * @param Subject $client
58+ *
4959 * @throws \InvalidArgumentException
60+ * @internal
5061 */
51- public function __construct (Subject $ client )
62+ public function __construct (Subject $ client, string $ authEndpoint = null , array $ authEndpointHeaders = null )
5263 {
53- $ this ->client = $ client ;
64+ $ this ->client = $ client ;
65+ $ this ->authEndpoint = $ authEndpoint ;
66+ $ this ->authEndpointHeaders = $ authEndpointHeaders ;
5467
5568 /** @var Observable $events */
5669 $ events = $ client
@@ -77,17 +90,21 @@ public function __construct(Subject $client)
7790 }
7891
7992 /**
80- * @param LoopInterface $loop
81- * @param string $app Application ID
82- * @param Resolver $resolver Optional DNS resolver
83- * @throws \InvalidArgumentException
93+ * @param LoopInterface $loop
94+ * @param string $app Application ID
95+ * @param Resolver $resolver Optional DNS resolver
96+ *
8497 * @return AsyncClient
98+ * @throws \InvalidArgumentException
8599 */
86100 public static function create (
87101 LoopInterface $ loop ,
88102 string $ app ,
89103 Resolver $ resolver = null ,
90- string $ cluster = null
104+ string $ cluster = null ,
105+ string $ host = null ,
106+ string $ authEndpoint = null ,
107+ array $ authEndpointHeaders = null ,
91108 ): AsyncClient {
92109 try {
93110 Scheduler::setDefaultFactory (function () use ($ loop ) {
@@ -97,16 +114,19 @@ public static function create(
97114 }
98115
99116 return new self (
100- WebSocket::createFactory (ApiSettings::createUrl ($ app , $ cluster ), false , [], $ loop , $ resolver )
117+ WebSocket::createFactory (ApiSettings::createUrl ($ app , $ cluster , $ host ), false , [], $ loop , $ resolver ),
118+ $ authEndpoint ,
119+ $ authEndpointHeaders
101120 );
102121 }
103122
104123 /**
105124 * Listen on a channel.
106125 *
107- * @param string $channel Channel to listen on
108- * @throws \InvalidArgumentException
126+ * @param string $channel Channel to listen on
127+ *
109128 * @return Observable
129+ * @throws \InvalidArgumentException
110130 */
111131 public function channel (string $ channel ): Observable
112132 {
@@ -121,9 +141,15 @@ public function channel(string $channel): Observable
121141 });
122142
123143 $ subscribe = $ this ->connected
124- ->do (function () use ($ channel ): void {
144+ ->do (function (Event $ event ) use ($ channel ): void {
145+ $ authKey = $ channelData = null ;
146+
147+ if (str_starts_with ($ channel , 'private- ' ) || str_starts_with ($ channel , 'presence- ' )) {
148+ [$ authKey , $ channelData ] = $ this ->generateAuthToken ($ channel , $ event ->getData ()['socket_id ' ]);
149+ }
150+
125151 // Subscribe to pusher channel after connected
126- $ this ->send (Event::subscribeOn ($ channel ));
152+ $ this ->send (Event::subscribeOn ($ channel, $ authKey , $ channelData ));
127153 })
128154 ->flatMapTo (Observable::empty ());
129155
@@ -152,7 +178,6 @@ public function channel(string $channel): Observable
152178 * Send a message through the client.
153179 *
154180 * @param array $message Message to send, will be json encoded
155- *
156181 */
157182 public function send (array $ message ): void
158183 {
@@ -163,7 +188,8 @@ public function send(array $message): void
163188 * Returns an observable of TimeoutException.
164189 * The timeout observable will get cancelled every time a new event is received.
165190 *
166- * @param Observable $events
191+ * @param Observable $events
192+ *
167193 * @return Observable
168194 */
169195 private function timeout (Observable $ events ): Observable
@@ -184,20 +210,22 @@ private function timeout(Observable $events): Observable
184210 }
185211
186212 return Observable::never ()
187- ->timeout ($ time )
188- ->catch (function () use ($ time ) {
189- // ping (do something that causes incoming stream to get a message)
190- $ this ->send (Event::ping ());
191- // this timeout will actually timeout with a TimeoutException - causing
192- // everything above this to dispose
193- return Observable::never ()->timeout ($ time );
194- });
213+ ->timeout ($ time )
214+ ->catch (function () use ($ time ) {
215+ // ping (do something that causes incoming stream to get a message)
216+ $ this ->send (Event::ping ());
217+ // this timeout will actually timeout with a TimeoutException - causing
218+ // everything above this to dispose
219+ return Observable::never ()->timeout ($ time );
220+ });
195221 });
196222 }
197223
198224 /**
199225 * Handle errors as described at https://pusher.com/docs/pusher_protocol#error-codes.
200- * @param Throwable $throwable
226+ *
227+ * @param Throwable $throwable
228+ *
201229 * @return Observable
202230 */
203231 private function handleLowLevelError (Throwable $ throwable ): Observable
@@ -233,4 +261,44 @@ private function handleLowLevelError(Throwable $throwable): Observable
233261
234262 return Observable::timer ($ this ->delay );
235263 }
264+
265+ /**
266+ * @throws \Exception
267+ */
268+ private function generateAuthToken (string $ channel , string $ socketId ): array
269+ {
270+ if (!$ this ->authEndpoint ) {
271+ throw new \Exception ('No auth endpoint is configured to connect private or presence channel. ' );
272+ }
273+
274+ $ curl = curl_init ();
275+
276+ curl_setopt_array ($ curl , [
277+ CURLOPT_URL => $ this ->authEndpoint ,
278+ CURLOPT_RETURNTRANSFER => true ,
279+ CURLOPT_MAXREDIRS => 10 ,
280+ CURLOPT_TIMEOUT => 5 ,
281+ CURLOPT_HTTP_VERSION => CURL_HTTP_VERSION_1_1 ,
282+ CURLOPT_CUSTOMREQUEST => 'POST ' ,
283+ CURLOPT_POSTFIELDS => ['channel_name ' => $ channel , 'socket_id ' => $ socketId , 'user_data ' => []],
284+ CURLOPT_HTTPHEADER => $ this ->authEndpointHeaders ,
285+ ]);
286+
287+ $ response = curl_exec ($ curl );
288+ $ responseCode = curl_getinfo ($ curl , CURLINFO_RESPONSE_CODE );
289+
290+ curl_close ($ curl );
291+
292+ if ($ responseCode !== 200 ) {
293+ throw new \Exception ('Can \'t generate auth token for ' . $ channel . '. Response code ' . $ responseCode );
294+ }
295+
296+ $ response = json_decode ($ response , true );
297+
298+ if (!isset ($ response ['auth ' ])) {
299+ throw new \Exception ('Invalid response for auth token. ' );
300+ }
301+
302+ return [$ response ['auth ' ], $ response ['channel_data ' ] ?? null ];
303+ }
236304}
0 commit comments