Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions js-src/Channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ export class Channel extends BaseChannel implements PresenceChannel {
}

here(callback: Function): Channel {
// TODO: implement
this.on('subscription_succeeded', (data) => {
callback(data)
})

return this
}
Expand All @@ -126,7 +128,9 @@ export class Channel extends BaseChannel implements PresenceChannel {
* Listen for someone joining the channel.
*/
joining(callback: Function): Channel {
// TODO: implement
this.on('member_added', (data) => {
callback(data)
})

return this
}
Expand All @@ -135,7 +139,9 @@ export class Channel extends BaseChannel implements PresenceChannel {
* Listen for someone leaving the channel.
*/
leaving(callback: Function): Channel {
// TODO: implement
this.on('member_removed', (data) => {
callback(data)
})

return this
}
Expand Down
8 changes: 6 additions & 2 deletions src/ConnectionRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use Aws\ApiGatewayManagementApi\ApiGatewayManagementApiClient;
use Aws\ApiGatewayManagementApi\Exception\ApiGatewayManagementApiException;
use GuzzleHttp\Exception\ClientException;
use Symfony\Component\HttpFoundation\Response;

class ConnectionRepository
{
Expand All @@ -28,8 +29,11 @@ public function sendMessage(string $connectionId, string $data): void
'Data' => $data,
]);
} catch (ApiGatewayManagementApiException $e) {
// GoneException: The connection with the provided id no longer exists.
if ($e->getAwsErrorCode() === 'GoneException') {
// GoneException: The connection with the provided id no longer exists.
if (
$e->getStatusCode() === Response::HTTP_GONE ||
$e->getAwsErrorCode() === 'GoneException'
) {
$this->subscriptionRepository->clearConnection($connectionId);

return;
Expand Down
64 changes: 61 additions & 3 deletions src/Handler.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Bref\Event\Http\HttpResponse;
use Illuminate\Support\Arr;
use Illuminate\Support\Str;
use Symfony\Component\HttpFoundation\Response;
use Throwable;

class Handler extends WebsocketHandler
Expand Down Expand Up @@ -42,6 +43,7 @@ public function handleWebsocket(WebsocketEvent $event, Context $context): HttpRe

protected function handleDisconnect(WebsocketEvent $event, Context $context): void
{
$this->sendPresenceDisconnectNotices($event);
$this->subscriptionRepository->clearConnection($event->getConnectionId());
}

Expand Down Expand Up @@ -115,12 +117,28 @@ protected function subscribe(WebsocketEvent $event, Context $context): void
}
}

$this->subscriptionRepository->subscribeToChannel($event->getConnectionId(), $channel);
if (Str::startsWith($channel, 'presence-')) {
$this->subscriptionRepository->subscribeToPresenceChannel(
$event->getConnectionId(),
$channelData,
$channel
);
$data = $this->subscriptionRepository->getUserListForPresenceChannel($channel)
->transform(function ($user) {
$user = json_decode($user, true);
return Arr::get($user, 'user_info', json_encode($user));
})
->toArray();
$this->sendPresenceAdd($event, $channel, Arr::get(json_decode($channelData, true), 'user_info'));
} else {
$this->subscriptionRepository->subscribeToChannel($event->getConnectionId(), $channel);
$data = [];
}

$this->sendMessage($event, $context, [
'event' => 'subscription_succeeded',
'channel' => $channel,
'data' => [],
'data' => $data,
]);
}

Expand All @@ -138,6 +156,18 @@ protected function unsubscribe(WebsocketEvent $event, Context $context): void
]);
}

public function sendPresenceDisconnectNotices(WebsocketEvent $event): void
{
$channels = $this->subscriptionRepository->getChannelsSubscribedToByConnectionId($event->getConnectionId());
$channels->filter(function ($info) {
return Str::startsWith(Arr::get($info, 'channel'), 'presence-');
})->each(function ($info) use ($event) {
$channel = Arr::get($info, 'channel');
$userData = json_decode(Arr::get($info, 'userData'), true);
$this->sendPresenceRemove($event, $channel, Arr::get($userData, 'user_info'));
});
}

public function broadcastToChannel(WebsocketEvent $event, Context $context): void
{
$skipConnectionId = $event->getConnectionId();
Expand All @@ -158,6 +188,34 @@ public function broadcastToChannel(WebsocketEvent $event, Context $context): voi
->each(fn (string $connectionId) => $this->sendMessageToConnection($connectionId, $data));
}

public function sendPresenceAdd(WebsocketEvent $event, string $channel, array $data): void
{
$skipConnectionId = $event->getConnectionId();
$eventBody = json_decode($event->getBody(), true);
$data = json_encode([
'event'=>'member_added',
'channel'=>$channel,
'data'=>$data
]) ?: '';
$this->subscriptionRepository->getConnectionIdsForChannel($channel)
->reject(fn ($connectionId) => $connectionId === $skipConnectionId)
->each(fn (string $connectionId) => $this->sendMessageToConnection($connectionId, $data));
}

public function sendPresenceRemove(WebsocketEvent $event, string $channel, array $data): void
{
$skipConnectionId = $event->getConnectionId();
$eventBody = json_decode($event->getBody(), true);
$data = json_encode([
'event'=>'member_removed',
'channel'=>$channel,
'data'=>$data
]) ?: '';
$this->subscriptionRepository->getConnectionIdsForChannel($channel)
->reject(fn ($connectionId) => $connectionId === $skipConnectionId)
->each(fn (string $connectionId) => $this->sendMessageToConnection($connectionId, $data));
}

public function sendMessage(WebsocketEvent $event, Context $context, array $data): void
{
$this->connectionRepository->sendMessage($event->getConnectionId(), json_encode($data, JSON_THROW_ON_ERROR));
Expand All @@ -168,7 +226,7 @@ protected function sendMessageToConnection(string $connectionId, string $data):
try {
$this->connectionRepository->sendMessage($connectionId, $data);
} catch (ApiGatewayManagementApiException $exception) {
if ($exception->getAwsErrorCode() === 'GoneException') {
if ($exception->getStatusCode() === Response::HTTP_GONE) {
$this->subscriptionRepository->clearConnection($connectionId);
return;
}
Expand Down
49 changes: 49 additions & 0 deletions src/SubscriptionRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,43 @@ public function getConnectionIdsForChannel(string ...$channels): Collection
->unique();
}

public function getUserListForPresenceChannel(string ...$channels): Collection
{
$promises = collect($channels)->map(fn ($channel) => $this->dynamoDb->queryAsync([
'TableName' => $this->table,
'IndexName' => 'lookup-by-channel',
'KeyConditionExpression' => 'channel = :channel',
'ExpressionAttributeValues' => [
':channel' => ['S' => $channel],
],
]))->toArray();

$responses = Utils::all($promises)->wait();

return collect($responses)
->flatmap(fn (\Aws\Result $result): array => $result['Items'])
->map(fn (array $item): string => Arr::get($item, 'userData.S', ''))
->unique();
}

public function getChannelsSubscribedToByConnectionId(string $connectionId): Collection
{
$response = $this->dynamoDb->query([
'TableName' => $this->table,
'KeyConditionExpression' => 'connectionId = :connectionId',
'ExpressionAttributeValues' => [
':connectionId' => ['S' => $connectionId],
],
]);
return collect(Arr::get($response, 'Items', []))
->transform(function ($item) {
return [
'channel'=>Arr::get($item, 'channel.S'),
'userData'=>Arr::get($item, 'userData.S'),
];
});
}

public function clearConnection(string $connectionId): void
{
$response = $this->dynamoDb->query([
Expand Down Expand Up @@ -86,4 +123,16 @@ public function unsubscribeFromChannel(string $connectionId, string $channel): v
],
]);
}

public function subscribeToPresenceChannel(string $connectionId, string $userData, string $channel): void
{
$this->dynamoDb->putItem([
'TableName' => $this->table,
'Item' => [
'connectionId' => ['S' => $connectionId],
'userData' => ['S' => $userData],
'channel' => ['S' => $channel],
],
]);
}
}
81 changes: 80 additions & 1 deletion tests/HandlerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
use GuzzleHttp\Psr7\Response;
use Mockery\Mock;
use Psr\Http\Message\RequestInterface;
use Symfony\Component\HttpFoundation\Response as SymfonyResponse;

it('can subscribe to open channels', function () {
app()->instance(SubscriptionRepository::class, Mockery::mock(SubscriptionRepository::class, function ($mock) {
Expand Down Expand Up @@ -113,7 +114,85 @@
], $context);
});

it('handles dropped connections', function () {
it('leaves presence channels', function () {
app()->instance(SubscriptionRepository::class, Mockery::mock(SubscriptionRepository::class, function ($mock) {
/** @var Mock $mock */
$mock->shouldReceive('getChannelsSubscribedToByConnectionId')->withArgs(function (string $connectionId): bool {
return $connectionId === 'connection-id-1';
})->once()
->andReturn(collect([
[
'channel'=>'presence-channel',
'userData'=>json_encode(['user_info'=>['the user info']]),
],
[
'channel'=>'other-channel',
]
]));
$mock->shouldReceive('getConnectionIdsForChannel')->withArgs(function (string $channel) {
return $channel === 'presence-channel';
})->once()
->andReturn(collect(['connection-id-1', 'connection-id-2']));
$mock->shouldReceive('clearConnection')->withArgs(function (string $connectionId) {
return $connectionId === 'connection-id-1';
})->once();
}));

app()->instance(ConnectionRepository::class, Mockery::mock(ConnectionRepository::class, function ($mock) {
/** @var Mock $mock */
$mock->shouldReceive('sendMessage')->withArgs(function (string $connectionId, string $data): bool {
return $connectionId === 'connection-id-2' and $data === '{"event":"member_removed","channel":"presence-channel","data":["the user info"]}';
})->once();
}));

/** @var Handler $handler */
$handler = app(Handler::class);

$context = new Context('request-id-1', 50_000, 'function-arn', 'trace-id-1');

$handler->handle([
'requestContext' => [
'routeKey' => 'my-test-route-key',
'eventType' => 'DISCONNECT',
'connectionId' => 'connection-id-1',
'domainName' => 'test-domain',
'apiId' => 'api-id-1',
'stage' => 'stage-test',
],
'body' => json_encode(['event' => 'disconnect']),
], $context);
});

it('handles dropped connections with HTTP_GONE', function () {
$mock = new MockHandler();

$mock->append(function (CommandInterface $cmd, RequestInterface $req) {
$mock = Mockery::mock(SymfonyResponse::class, function ($mock) {
$mock->shouldReceive('getStatusCode')
->andReturn(SymfonyResponse::HTTP_GONE);
});
return new ApiGatewayManagementApiException('', $cmd, [
'response' => $mock
]);
});

/** @var SubscriptionRepository */
$subscriptionRepository = Mockery::mock(SubscriptionRepository::class, function ($mock) {
/** @var Mock $mock */
$mock->shouldReceive('clearConnection')->withArgs(function (string $connectionId): bool {
return $connectionId === 'dropped-connection-id-1234';
})->once();
});

$config = config('laravel-echo-api-gateway');

/** @var ConnectionRepository */
$connectionRepository = new ConnectionRepository($subscriptionRepository, array_merge_recursive(['connection' => ['handler' => $mock]], $config));

$connectionRepository->sendMessage('dropped-connection-id-1234', 'test-message');
});

it('handles dropped connections with GoneException', function () {
$mock = new MockHandler();

$mock->append(function (CommandInterface $cmd, RequestInterface $req) {
Expand Down