Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/Basic/Driver.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use Laudis\Neo4j\Contracts\AuthenticateInterface;
use Laudis\Neo4j\Contracts\DriverInterface;
use Laudis\Neo4j\Databags\DriverConfiguration;
use Laudis\Neo4j\Databags\ServerInfo;
use Laudis\Neo4j\Databags\SessionConfiguration;
use Laudis\Neo4j\DriverFactory;
use Laudis\Neo4j\Formatter\SummarizedResultFormatter;
Expand Down Expand Up @@ -44,6 +45,11 @@ public function verifyConnectivity(?SessionConfiguration $config = null): bool
return $this->driver->verifyConnectivity($config);
}

public function getServerInfo(?SessionConfiguration $config = null): ServerInfo
{
return $this->driver->getServerInfo($config);
}

public static function create(string|UriInterface $uri, ?DriverConfiguration $configuration = null, ?AuthenticateInterface $authenticate = null): self
{
$driver = DriverFactory::create($uri, $configuration, $authenticate, SummarizedResultFormatter::create());
Expand Down
42 changes: 27 additions & 15 deletions src/Bolt/BoltConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ class BoltConnection implements ConnectionInterface
*/
private array $subscribedResults = [];
private bool $inTransaction = false;
/** @var array<string, bool> Track if this connection was ever used for a query */
private array $connectionUsed = [
'reader' => false,
'writer' => false,
];

/**
* @return array{0: V4_4|V5|V5_1|V5_2|V5_3|V5_4|null, 1: Connection}
Expand Down Expand Up @@ -199,13 +204,6 @@ public function reset(): void
$this->subscribedResults = [];
}

private function prepareForBegin(): void
{
if (in_array($this->getServerState(), ['STREAMING', 'TX_STREAMING'], true)) {
$this->discardUnconsumedResults();
}
}

/**
* Begins a transaction.
*
Expand Down Expand Up @@ -254,6 +252,12 @@ public function run(
?AccessMode $mode,
?iterable $tsxMetadata,
): array {
if ($mode === AccessMode::WRITE()) {
$this->connectionUsed['writer'] = true;
} else {
$this->connectionUsed['reader'] = true;
}

if ($this->isInTransaction()) {
$extra = [];
} else {
Expand Down Expand Up @@ -321,17 +325,17 @@ public function __destruct()

public function close(): void
{
try {
if ($this->isOpen()) {
if ($this->isStreaming()) {
$this->discardUnconsumedResults();
}
if ($this->isOpen()) {
if ($this->isStreaming() && (($this->connectionUsed['reader'] ?? false) || ($this->connectionUsed['writer'] ?? false))) {
$this->discardUnconsumedResults();
}

if (($this->connectionUsed['reader'] ?? false) || ($this->connectionUsed['writer'] ?? false)) {
$message = $this->messageFactory->createGoodbyeMessage();
$message->send();

unset($this->boltProtocol); // has to be set to null as the sockets don't recover nicely contrary to what the underlying code might lead you to believe;
}
} catch (Throwable) {

unset($this->boltProtocol);
}
}

Expand Down Expand Up @@ -437,6 +441,7 @@ public function assertNoFailure(Response $response): void
public function discardUnconsumedResults(): void
{
$this->logger?->log(LogLevel::DEBUG, 'Discarding unconsumed results');

$this->subscribedResults = array_values(array_filter(
$this->subscribedResults,
static fn (WeakReference $ref): bool => $ref->get() !== null
Expand All @@ -451,6 +456,13 @@ public function discardUnconsumedResults(): void
$state = $this->getServerState();
$this->logger?->log(LogLevel::DEBUG, "Server state before discard: {$state}");

if (!($this->connectionUsed['reader'] ?? false) && !($this->connectionUsed['writer'] ?? false)) {
$this->logger?->log(LogLevel::DEBUG, 'Skipping discard - connection never used');
$this->subscribedResults = [];

return;
}

try {
if (in_array($state, ['STREAMING', 'TX_STREAMING'], true)) {
$this->discard(null);
Expand Down
33 changes: 33 additions & 0 deletions src/Bolt/BoltDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
use Laudis\Neo4j\Contracts\DriverInterface;
use Laudis\Neo4j\Contracts\SessionInterface;
use Laudis\Neo4j\Databags\DriverConfiguration;
use Laudis\Neo4j\Databags\ServerInfo;
use Laudis\Neo4j\Databags\SessionConfiguration;
use Laudis\Neo4j\Enum\AccessMode;
use Laudis\Neo4j\Formatter\SummarizedResultFormatter;
use Psr\Http\Message\UriInterface;
use Psr\Log\LogLevel;
Expand Down Expand Up @@ -97,6 +99,37 @@ public function verifyConnectivity(?SessionConfiguration $config = null): bool
return true;
}

/**
* Gets server information without running a query.
*
* Acquires a connection from the pool and extracts server metadata.
* The pool handles all connection management, routing, and retries.
*
* @throws Exception if unable to acquire a connection
*/
public function getServerInfo(?SessionConfiguration $config = null): ServerInfo
{
$config ??= SessionConfiguration::default()->withAccessMode(AccessMode::READ());

$connectionGenerator = $this->pool->acquire($config);
/**
* @var BoltConnection $connection
*
* @psalm-suppress UnnecessaryVarAnnotation
*/
$connection = GeneratorHelper::getReturnFromGenerator($connectionGenerator);

try {
return new ServerInfo(
$connection->getServerAddress(),
$connection->getProtocol(),
$connection->getServerAgent()
);
} finally {
$this->pool->release($connection);
}
}

public function closeConnections(): void
{
$this->pool->close();
Expand Down
2 changes: 0 additions & 2 deletions src/Bolt/Session.php
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,6 @@ private function acquireConnection(TransactionConfiguration $config, SessionConf
*/
$connection = GeneratorHelper::getReturnFromGenerator($connectionGenerator);

// We try and let the server do the timeout management.
// Since the client should not run indefinitely, we just add the client side by two, just in case
$timeout = $config->getTimeout();
if ($timeout !== null) {
$timeout = ($timeout < 30) ? 30 : $timeout;
Expand Down
9 changes: 9 additions & 0 deletions src/Contracts/DriverInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

namespace Laudis\Neo4j\Contracts;

use Laudis\Neo4j\Databags\ServerInfo;
use Laudis\Neo4j\Databags\SessionConfiguration;
use Laudis\Neo4j\Types\CypherList;
use Laudis\Neo4j\Types\CypherMap;
Expand All @@ -35,6 +36,14 @@ public function createSession(?SessionConfiguration $config = null): SessionInte
*/
public function verifyConnectivity(?SessionConfiguration $config = null): bool;

/**
* Gets server information without running a query.
*
* Acquires a connection from the pool and extracts server metadata.
* The pool handles all connection management, routing, and retries.
*/
public function getServerInfo(?SessionConfiguration $config = null): ServerInfo;

/**
* Closes all connections in the pool.
*/
Expand Down
18 changes: 18 additions & 0 deletions src/Databags/SessionConfiguration.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,27 @@ public function __construct(
private readonly ?AccessMode $accessMode = null,
private readonly ?array $bookmarks = null,
private readonly ?Neo4jLogger $logger = null,
private readonly ?string $impersonatedUser = null,
) {
}

public function withImpersonatedUser(?string $user): self
{
return new self(
$this->database,
$this->fetchSize,
$this->accessMode,
$this->bookmarks,
$this->logger,
$user
);
}

public function getImpersonatedUser(): ?string
{
return $this->impersonatedUser;
}

/**
* @pure
*
Expand Down
14 changes: 14 additions & 0 deletions src/Neo4j/Neo4jConnectionPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ private function getNextServer(RoutingTable $table, AccessMode $mode): Uri
$servers = $table->getWithRole(RoutingRoles::FOLLOWER());
}

if (empty($servers)) {
throw new RuntimeException('No available servers found for the requested access mode');
}

return Uri::create($servers[random_int(0, count($servers) - 1)]);
}

Expand Down Expand Up @@ -259,6 +263,16 @@ public function close(): void
$this->cache->clear();
}

/**
* Forces a routing table refresh for the given configuration.
* This will cause the next acquire() call to fetch a fresh routing table.
*/
public function refreshRoutingTable(SessionConfiguration $config): void
{
$key = $this->createKey($this->data, $config);
$this->cache->delete($key);
}

/**
* @return Generator<string>
*/
Expand Down
38 changes: 38 additions & 0 deletions src/Neo4j/Neo4jDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use function is_string;

use Laudis\Neo4j\Authentication\Authenticate;
use Laudis\Neo4j\Bolt\BoltConnection;
use Laudis\Neo4j\Bolt\Session;
use Laudis\Neo4j\Common\DNSAddressResolver;
use Laudis\Neo4j\Common\GeneratorHelper;
Expand All @@ -28,7 +29,9 @@
use Laudis\Neo4j\Contracts\DriverInterface;
use Laudis\Neo4j\Contracts\SessionInterface;
use Laudis\Neo4j\Databags\DriverConfiguration;
use Laudis\Neo4j\Databags\ServerInfo;
use Laudis\Neo4j\Databags\SessionConfiguration;
use Laudis\Neo4j\Enum\AccessMode;
use Laudis\Neo4j\Formatter\SummarizedResultFormatter;
use Psr\Http\Message\UriInterface;
use Psr\Log\LogLevel;
Expand Down Expand Up @@ -75,6 +78,8 @@ public static function create(string|UriInterface $uri, ?DriverConfiguration $co
/**
* @psalm-mutation-free
*
* @psalm-suppress UnnecessaryVarAnnotation
*
* @throws Exception
*/
public function createSession(?SessionConfiguration $config = null): SessionInterface
Expand All @@ -99,6 +104,39 @@ public function verifyConnectivity(?SessionConfiguration $config = null): bool
return true;
}

/**
* Gets server information without running a query.
*
* Acquires a connection from the pool and extracts server metadata.
* The pool handles all connection management, routing, and retries.
*
* @throws Exception if unable to acquire a connection
*/
public function getServerInfo(?SessionConfiguration $config = null): ServerInfo
{
$config ??= SessionConfiguration::default()->withAccessMode(AccessMode::READ());

$this->pool->refreshRoutingTable($config);

$connectionGenerator = $this->pool->acquire($config);
/**
* @var BoltConnection $connection
*
* @psalm-suppress UnnecessaryVarAnnotation
*/
$connection = GeneratorHelper::getReturnFromGenerator($connectionGenerator);

try {
return new ServerInfo(
$connection->getServerAddress(),
$connection->getProtocol(),
$connection->getServerAgent()
);
} finally {
$this->pool->release($connection);
}
}

public function closeConnections(): void
{
$this->pool->close();
Expand Down
1 change: 1 addition & 0 deletions testkit
Submodule testkit added at cbc816
2 changes: 2 additions & 0 deletions testkit-backend/src/Handlers/DriverClose.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public function __construct(MainRepository $repository)
*/
public function handle($request): DriverResponse
{
$driver = $this->repository->getDriver($request->getDriverId());
$driver->closeConnections();
$this->repository->removeDriver($request->getDriverId());

return new DriverResponse($request->getDriverId());
Expand Down
Loading
Loading