summaryrefslogtreecommitdiff
path: root/vendor/symfony/cache/Traits/RedisTrait.php
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/symfony/cache/Traits/RedisTrait.php')
-rw-r--r--vendor/symfony/cache/Traits/RedisTrait.php682
1 files changed, 682 insertions, 0 deletions
diff --git a/vendor/symfony/cache/Traits/RedisTrait.php b/vendor/symfony/cache/Traits/RedisTrait.php
new file mode 100644
index 0000000..a4fdc5d
--- /dev/null
+++ b/vendor/symfony/cache/Traits/RedisTrait.php
@@ -0,0 +1,682 @@
1<?php
2
3/*
4 * This file is part of the Symfony package.
5 *
6 * (c) Fabien Potencier <fabien@symfony.com>
7 *
8 * For the full copyright and license information, please view the LICENSE
9 * file that was distributed with this source code.
10 */
11
12namespace Symfony\Component\Cache\Traits;
13
14use Predis\Command\Redis\UNLINK;
15use Predis\Connection\Aggregate\ClusterInterface;
16use Predis\Connection\Aggregate\RedisCluster;
17use Predis\Connection\Aggregate\ReplicationInterface;
18use Predis\Connection\Cluster\ClusterInterface as Predis2ClusterInterface;
19use Predis\Connection\Cluster\RedisCluster as Predis2RedisCluster;
20use Predis\Response\ErrorInterface;
21use Predis\Response\Status;
22use Relay\Relay;
23use Relay\Sentinel;
24use Symfony\Component\Cache\Exception\CacheException;
25use Symfony\Component\Cache\Exception\InvalidArgumentException;
26use Symfony\Component\Cache\Marshaller\DefaultMarshaller;
27use Symfony\Component\Cache\Marshaller\MarshallerInterface;
28
29/**
30 * @author Aurimas Niekis <aurimas@niekis.lt>
31 * @author Nicolas Grekas <p@tchwork.com>
32 *
33 * @internal
34 */
35trait RedisTrait
36{
37 private static array $defaultConnectionOptions = [
38 'class' => null,
39 'persistent' => 0,
40 'persistent_id' => null,
41 'timeout' => 30,
42 'read_timeout' => 0,
43 'retry_interval' => 0,
44 'tcp_keepalive' => 0,
45 'lazy' => null,
46 'redis_cluster' => false,
47 'redis_sentinel' => null,
48 'dbindex' => 0,
49 'failover' => 'none',
50 'ssl' => null, // see https://php.net/context.ssl
51 ];
52 private \Redis|Relay|\RedisArray|\RedisCluster|\Predis\ClientInterface $redis;
53 private MarshallerInterface $marshaller;
54
55 private function init(\Redis|Relay|\RedisArray|\RedisCluster|\Predis\ClientInterface $redis, string $namespace, int $defaultLifetime, ?MarshallerInterface $marshaller): void
56 {
57 parent::__construct($namespace, $defaultLifetime);
58
59 if (preg_match('#[^-+_.A-Za-z0-9]#', $namespace, $match)) {
60 throw new InvalidArgumentException(sprintf('RedisAdapter namespace contains "%s" but only characters in [-+_.A-Za-z0-9] are allowed.', $match[0]));
61 }
62
63 if ($redis instanceof \Predis\ClientInterface && $redis->getOptions()->exceptions) {
64 $options = clone $redis->getOptions();
65 \Closure::bind(function () { $this->options['exceptions'] = false; }, $options, $options)();
66 $redis = new $redis($redis->getConnection(), $options);
67 }
68
69 $this->redis = $redis;
70 $this->marshaller = $marshaller ?? new DefaultMarshaller();
71 }
72
73 /**
74 * Creates a Redis connection using a DSN configuration.
75 *
76 * Example DSN:
77 * - redis://localhost
78 * - redis://example.com:1234
79 * - redis://secret@example.com/13
80 * - redis:///var/run/redis.sock
81 * - redis://secret@/var/run/redis.sock/13
82 *
83 * @param array $options See self::$defaultConnectionOptions
84 *
85 * @throws InvalidArgumentException when the DSN is invalid
86 */
87 public static function createConnection(#[\SensitiveParameter] string $dsn, array $options = []): \Redis|\RedisArray|\RedisCluster|\Predis\ClientInterface|Relay
88 {
89 if (str_starts_with($dsn, 'redis:')) {
90 $scheme = 'redis';
91 } elseif (str_starts_with($dsn, 'rediss:')) {
92 $scheme = 'rediss';
93 } else {
94 throw new InvalidArgumentException('Invalid Redis DSN: it does not start with "redis[s]:".');
95 }
96
97 if (!\extension_loaded('redis') && !class_exists(\Predis\Client::class)) {
98 throw new CacheException('Cannot find the "redis" extension nor the "predis/predis" package.');
99 }
100
101 $params = preg_replace_callback('#^'.$scheme.':(//)?(?:(?:(?<user>[^:@]*+):)?(?<password>[^@]*+)@)?#', function ($m) use (&$auth) {
102 if (isset($m['password'])) {
103 if (\in_array($m['user'], ['', 'default'], true)) {
104 $auth = rawurldecode($m['password']);
105 } else {
106 $auth = [rawurldecode($m['user']), rawurldecode($m['password'])];
107 }
108
109 if ('' === $auth) {
110 $auth = null;
111 }
112 }
113
114 return 'file:'.($m[1] ?? '');
115 }, $dsn);
116
117 if (false === $params = parse_url($params)) {
118 throw new InvalidArgumentException('Invalid Redis DSN.');
119 }
120
121 $query = $hosts = [];
122
123 $tls = 'rediss' === $scheme;
124 $tcpScheme = $tls ? 'tls' : 'tcp';
125
126 if (isset($params['query'])) {
127 parse_str($params['query'], $query);
128
129 if (isset($query['host'])) {
130 if (!\is_array($hosts = $query['host'])) {
131 throw new InvalidArgumentException('Invalid Redis DSN: query parameter "host" must be an array.');
132 }
133 foreach ($hosts as $host => $parameters) {
134 if (\is_string($parameters)) {
135 parse_str($parameters, $parameters);
136 }
137 if (false === $i = strrpos($host, ':')) {
138 $hosts[$host] = ['scheme' => $tcpScheme, 'host' => $host, 'port' => 6379] + $parameters;
139 } elseif ($port = (int) substr($host, 1 + $i)) {
140 $hosts[$host] = ['scheme' => $tcpScheme, 'host' => substr($host, 0, $i), 'port' => $port] + $parameters;
141 } else {
142 $hosts[$host] = ['scheme' => 'unix', 'path' => substr($host, 0, $i)] + $parameters;
143 }
144 }
145 $hosts = array_values($hosts);
146 }
147 }
148
149 if (isset($params['host']) || isset($params['path'])) {
150 if (!isset($params['dbindex']) && isset($params['path'])) {
151 if (preg_match('#/(\d+)?$#', $params['path'], $m)) {
152 $params['dbindex'] = $m[1] ?? $query['dbindex'] ?? '0';
153 $params['path'] = substr($params['path'], 0, -\strlen($m[0]));
154 } elseif (isset($params['host'])) {
155 throw new InvalidArgumentException('Invalid Redis DSN: parameter "dbindex" must be a number.');
156 }
157 }
158
159 if (isset($params['host'])) {
160 array_unshift($hosts, ['scheme' => $tcpScheme, 'host' => $params['host'], 'port' => $params['port'] ?? 6379]);
161 } else {
162 array_unshift($hosts, ['scheme' => 'unix', 'path' => $params['path']]);
163 }
164 }
165
166 if (!$hosts) {
167 throw new InvalidArgumentException('Invalid Redis DSN: missing host.');
168 }
169
170 if (isset($params['dbindex'], $query['dbindex']) && $params['dbindex'] !== $query['dbindex']) {
171 throw new InvalidArgumentException('Invalid Redis DSN: path and query "dbindex" parameters mismatch.');
172 }
173
174 $params += $query + $options + self::$defaultConnectionOptions;
175
176 if (isset($params['redis_sentinel']) && isset($params['sentinel_master'])) {
177 throw new InvalidArgumentException('Cannot use both "redis_sentinel" and "sentinel_master" at the same time.');
178 }
179
180 $params['redis_sentinel'] ??= $params['sentinel_master'] ?? null;
181
182 if (isset($params['redis_sentinel']) && !class_exists(\Predis\Client::class) && !class_exists(\RedisSentinel::class) && !class_exists(Sentinel::class)) {
183 throw new CacheException('Redis Sentinel support requires one of: "predis/predis", "ext-redis >= 5.2", "ext-relay".');
184 }
185
186 if (isset($params['lazy'])) {
187 $params['lazy'] = filter_var($params['lazy'], \FILTER_VALIDATE_BOOLEAN);
188 }
189 $params['redis_cluster'] = filter_var($params['redis_cluster'], \FILTER_VALIDATE_BOOLEAN);
190
191 if ($params['redis_cluster'] && isset($params['redis_sentinel'])) {
192 throw new InvalidArgumentException('Cannot use both "redis_cluster" and "redis_sentinel" at the same time.');
193 }
194
195 $class = $params['class'] ?? match (true) {
196 $params['redis_cluster'] => \extension_loaded('redis') ? \RedisCluster::class : \Predis\Client::class,
197 isset($params['redis_sentinel']) => match (true) {
198 \extension_loaded('redis') => \Redis::class,
199 \extension_loaded('relay') => Relay::class,
200 default => \Predis\Client::class,
201 },
202 1 < \count($hosts) && \extension_loaded('redis') => 1 < \count($hosts) ? \RedisArray::class : \Redis::class,
203 \extension_loaded('redis') => \Redis::class,
204 \extension_loaded('relay') => Relay::class,
205 default => \Predis\Client::class,
206 };
207
208 if (isset($params['redis_sentinel']) && !is_a($class, \Predis\Client::class, true) && !class_exists(\RedisSentinel::class) && !class_exists(Sentinel::class)) {
209 throw new CacheException(sprintf('Cannot use Redis Sentinel: class "%s" does not extend "Predis\Client" and neither ext-redis >= 5.2 nor ext-relay have been found.', $class));
210 }
211
212 $isRedisExt = is_a($class, \Redis::class, true);
213 $isRelayExt = !$isRedisExt && is_a($class, Relay::class, true);
214
215 if ($isRedisExt || $isRelayExt) {
216 $connect = $params['persistent'] || $params['persistent_id'] ? 'pconnect' : 'connect';
217
218 $initializer = static function () use ($class, $isRedisExt, $connect, $params, $auth, $hosts, $tls) {
219 $sentinelClass = $isRedisExt ? \RedisSentinel::class : Sentinel::class;
220 $redis = new $class();
221 $hostIndex = 0;
222 do {
223 $host = $hosts[$hostIndex]['host'] ?? $hosts[$hostIndex]['path'];
224 $port = $hosts[$hostIndex]['port'] ?? 0;
225 $passAuth = isset($params['auth']) && (!$isRedisExt || \defined('Redis::OPT_NULL_MULTIBULK_AS_NULL'));
226 $address = false;
227
228 if (isset($hosts[$hostIndex]['host']) && $tls) {
229 $host = 'tls://'.$host;
230 }
231
232 if (!isset($params['redis_sentinel'])) {
233 break;
234 }
235
236 try {
237 if (version_compare(phpversion('redis'), '6.0.0', '>=') && $isRedisExt) {
238 $options = [
239 'host' => $host,
240 'port' => $port,
241 'connectTimeout' => $params['timeout'],
242 'persistent' => $params['persistent_id'],
243 'retryInterval' => $params['retry_interval'],
244 'readTimeout' => $params['read_timeout'],
245 ];
246
247 if ($passAuth) {
248 $options['auth'] = $params['auth'];
249 }
250
251 $sentinel = new \RedisSentinel($options);
252 } else {
253 $extra = $passAuth ? [$params['auth']] : [];
254
255 $sentinel = new $sentinelClass($host, $port, $params['timeout'], (string) $params['persistent_id'], $params['retry_interval'], $params['read_timeout'], ...$extra);
256 }
257
258 if ($address = $sentinel->getMasterAddrByName($params['redis_sentinel'])) {
259 [$host, $port] = $address;
260 }
261 } catch (\RedisException|\Relay\Exception $redisException) {
262 }
263 } while (++$hostIndex < \count($hosts) && !$address);
264
265 if (isset($params['redis_sentinel']) && !$address) {
266 throw new InvalidArgumentException(sprintf('Failed to retrieve master information from sentinel "%s".', $params['redis_sentinel']), previous: $redisException ?? null);
267 }
268
269 try {
270 $extra = [
271 'stream' => $params['ssl'] ?? null,
272 ];
273 $booleanStreamOptions = [
274 'allow_self_signed',
275 'capture_peer_cert',
276 'capture_peer_cert_chain',
277 'disable_compression',
278 'SNI_enabled',
279 'verify_peer',
280 'verify_peer_name',
281 ];
282
283 foreach ($extra['stream'] ?? [] as $streamOption => $value) {
284 if (\in_array($streamOption, $booleanStreamOptions, true) && \is_string($value)) {
285 $extra['stream'][$streamOption] = filter_var($value, \FILTER_VALIDATE_BOOL);
286 }
287 }
288
289 if (isset($params['auth'])) {
290 $extra['auth'] = $params['auth'];
291 }
292 @$redis->{$connect}($host, $port, (float) $params['timeout'], (string) $params['persistent_id'], $params['retry_interval'], $params['read_timeout'], ...\defined('Redis::SCAN_PREFIX') || !$isRedisExt ? [$extra] : []);
293
294 set_error_handler(function ($type, $msg) use (&$error) { $error = $msg; });
295 try {
296 $isConnected = $redis->isConnected();
297 } finally {
298 restore_error_handler();
299 }
300 if (!$isConnected) {
301 $error = preg_match('/^Redis::p?connect\(\): (.*)/', $error ?? $redis->getLastError() ?? '', $error) ? sprintf(' (%s)', $error[1]) : '';
302 throw new InvalidArgumentException('Redis connection failed: '.$error.'.');
303 }
304
305 if ((null !== $auth && !$redis->auth($auth))
306 // Due to a bug in phpredis we must always select the dbindex if persistent pooling is enabled
307 // @see https://github.com/phpredis/phpredis/issues/1920
308 // @see https://github.com/symfony/symfony/issues/51578
309 || (($params['dbindex'] || ('pconnect' === $connect && '0' !== \ini_get('redis.pconnect.pooling_enabled'))) && !$redis->select($params['dbindex']))
310 ) {
311 $e = preg_replace('/^ERR /', '', $redis->getLastError());
312 throw new InvalidArgumentException('Redis connection failed: '.$e.'.');
313 }
314
315 if (0 < $params['tcp_keepalive'] && (!$isRedisExt || \defined('Redis::OPT_TCP_KEEPALIVE'))) {
316 $redis->setOption($isRedisExt ? \Redis::OPT_TCP_KEEPALIVE : Relay::OPT_TCP_KEEPALIVE, $params['tcp_keepalive']);
317 }
318 } catch (\RedisException|\Relay\Exception $e) {
319 throw new InvalidArgumentException('Redis connection failed: '.$e->getMessage());
320 }
321
322 return $redis;
323 };
324
325 if ($params['lazy']) {
326 $redis = $isRedisExt ? RedisProxy::createLazyProxy($initializer) : RelayProxy::createLazyProxy($initializer);
327 } else {
328 $redis = $initializer();
329 }
330 } elseif (is_a($class, \RedisArray::class, true)) {
331 foreach ($hosts as $i => $host) {
332 $hosts[$i] = match ($host['scheme']) {
333 'tcp' => $host['host'].':'.$host['port'],
334 'tls' => 'tls://'.$host['host'].':'.$host['port'],
335 default => $host['path'],
336 };
337 }
338 $params['lazy_connect'] = $params['lazy'] ?? true;
339 $params['connect_timeout'] = $params['timeout'];
340
341 try {
342 $redis = new $class($hosts, $params);
343 } catch (\RedisClusterException $e) {
344 throw new InvalidArgumentException('Redis connection failed: '.$e->getMessage());
345 }
346
347 if (0 < $params['tcp_keepalive'] && (!$isRedisExt || \defined('Redis::OPT_TCP_KEEPALIVE'))) {
348 $redis->setOption($isRedisExt ? \Redis::OPT_TCP_KEEPALIVE : Relay::OPT_TCP_KEEPALIVE, $params['tcp_keepalive']);
349 }
350 } elseif (is_a($class, \RedisCluster::class, true)) {
351 $initializer = static function () use ($isRedisExt, $class, $params, $hosts) {
352 foreach ($hosts as $i => $host) {
353 $hosts[$i] = match ($host['scheme']) {
354 'tcp' => $host['host'].':'.$host['port'],
355 'tls' => 'tls://'.$host['host'].':'.$host['port'],
356 default => $host['path'],
357 };
358 }
359
360 try {
361 $redis = new $class(null, $hosts, $params['timeout'], $params['read_timeout'], (bool) $params['persistent'], $params['auth'] ?? '', ...\defined('Redis::SCAN_PREFIX') ? [$params['ssl'] ?? null] : []);
362 } catch (\RedisClusterException $e) {
363 throw new InvalidArgumentException('Redis connection failed: '.$e->getMessage());
364 }
365
366 if (0 < $params['tcp_keepalive'] && (!$isRedisExt || \defined('Redis::OPT_TCP_KEEPALIVE'))) {
367 $redis->setOption($isRedisExt ? \Redis::OPT_TCP_KEEPALIVE : Relay::OPT_TCP_KEEPALIVE, $params['tcp_keepalive']);
368 }
369 $redis->setOption(\RedisCluster::OPT_SLAVE_FAILOVER, match ($params['failover']) {
370 'error' => \RedisCluster::FAILOVER_ERROR,
371 'distribute' => \RedisCluster::FAILOVER_DISTRIBUTE,
372 'slaves' => \RedisCluster::FAILOVER_DISTRIBUTE_SLAVES,
373 'none' => \RedisCluster::FAILOVER_NONE,
374 });
375
376 return $redis;
377 };
378
379 $redis = $params['lazy'] ? RedisClusterProxy::createLazyProxy($initializer) : $initializer();
380 } elseif (is_a($class, \Predis\ClientInterface::class, true)) {
381 if ($params['redis_cluster']) {
382 $params['cluster'] = 'redis';
383 } elseif (isset($params['redis_sentinel'])) {
384 $params['replication'] = 'sentinel';
385 $params['service'] = $params['redis_sentinel'];
386 }
387 $params += ['parameters' => []];
388 $params['parameters'] += [
389 'persistent' => $params['persistent'],
390 'timeout' => $params['timeout'],
391 'read_write_timeout' => $params['read_timeout'],
392 'tcp_nodelay' => true,
393 ];
394 if ($params['dbindex']) {
395 $params['parameters']['database'] = $params['dbindex'];
396 }
397 if (null !== $auth) {
398 if (\is_array($auth)) {
399 // ACL
400 $params['parameters']['username'] = $auth[0];
401 $params['parameters']['password'] = $auth[1];
402 } else {
403 $params['parameters']['password'] = $auth;
404 }
405 }
406
407 if (isset($params['ssl'])) {
408 foreach ($hosts as $i => $host) {
409 $hosts[$i]['ssl'] ??= $params['ssl'];
410 }
411 }
412
413 if (1 === \count($hosts) && !($params['redis_cluster'] || $params['redis_sentinel'])) {
414 $hosts = $hosts[0];
415 } elseif (\in_array($params['failover'], ['slaves', 'distribute'], true) && !isset($params['replication'])) {
416 $params['replication'] = true;
417 $hosts[0] += ['alias' => 'master'];
418 }
419 $params['exceptions'] = false;
420
421 $redis = new $class($hosts, array_diff_key($params, self::$defaultConnectionOptions));
422 if (isset($params['redis_sentinel'])) {
423 $redis->getConnection()->setSentinelTimeout($params['timeout']);
424 }
425 } elseif (class_exists($class, false)) {
426 throw new InvalidArgumentException(sprintf('"%s" is not a subclass of "Redis", "RedisArray", "RedisCluster", "Relay\Relay" nor "Predis\ClientInterface".', $class));
427 } else {
428 throw new InvalidArgumentException(sprintf('Class "%s" does not exist.', $class));
429 }
430
431 return $redis;
432 }
433
434 protected function doFetch(array $ids): iterable
435 {
436 if (!$ids) {
437 return [];
438 }
439
440 $result = [];
441
442 if ($this->redis instanceof \Predis\ClientInterface && ($this->redis->getConnection() instanceof ClusterInterface || $this->redis->getConnection() instanceof Predis2ClusterInterface)) {
443 $values = $this->pipeline(function () use ($ids) {
444 foreach ($ids as $id) {
445 yield 'get' => [$id];
446 }
447 });
448 } else {
449 $values = $this->redis->mget($ids);
450
451 if (!\is_array($values) || \count($values) !== \count($ids)) {
452 return [];
453 }
454
455 $values = array_combine($ids, $values);
456 }
457
458 foreach ($values as $id => $v) {
459 if ($v) {
460 $result[$id] = $this->marshaller->unmarshall($v);
461 }
462 }
463
464 return $result;
465 }
466
467 protected function doHave(string $id): bool
468 {
469 return (bool) $this->redis->exists($id);
470 }
471
472 protected function doClear(string $namespace): bool
473 {
474 if ($this->redis instanceof \Predis\ClientInterface) {
475 $prefix = $this->redis->getOptions()->prefix ? $this->redis->getOptions()->prefix->getPrefix() : '';
476 $prefixLen = \strlen($prefix ?? '');
477 }
478
479 $cleared = true;
480 $hosts = $this->getHosts();
481 $host = reset($hosts);
482 if ($host instanceof \Predis\Client && $host->getConnection() instanceof ReplicationInterface) {
483 // Predis supports info command only on the master in replication environments
484 $hosts = [$host->getClientFor('master')];
485 }
486
487 foreach ($hosts as $host) {
488 if (!isset($namespace[0])) {
489 $cleared = $host->flushDb() && $cleared;
490 continue;
491 }
492
493 $info = $host->info('Server');
494 $info = !$info instanceof ErrorInterface ? $info['Server'] ?? $info : ['redis_version' => '2.0'];
495
496 if ($host instanceof Relay) {
497 $prefix = Relay::SCAN_PREFIX & $host->getOption(Relay::OPT_SCAN) ? '' : $host->getOption(Relay::OPT_PREFIX);
498 $prefixLen = \strlen($host->getOption(Relay::OPT_PREFIX) ?? '');
499 } elseif (!$host instanceof \Predis\ClientInterface) {
500 $prefix = \defined('Redis::SCAN_PREFIX') && (\Redis::SCAN_PREFIX & $host->getOption(\Redis::OPT_SCAN)) ? '' : $host->getOption(\Redis::OPT_PREFIX);
501 $prefixLen = \strlen($host->getOption(\Redis::OPT_PREFIX) ?? '');
502 }
503 $pattern = $prefix.$namespace.'*';
504
505 if (!version_compare($info['redis_version'], '2.8', '>=')) {
506 // As documented in Redis documentation (http://redis.io/commands/keys) using KEYS
507 // can hang your server when it is executed against large databases (millions of items).
508 // Whenever you hit this scale, you should really consider upgrading to Redis 2.8 or above.
509 $unlink = version_compare($info['redis_version'], '4.0', '>=') ? 'UNLINK' : 'DEL';
510 $args = $this->redis instanceof \Predis\ClientInterface ? [0, $pattern] : [[$pattern], 0];
511 $cleared = $host->eval("local keys=redis.call('KEYS',ARGV[1]) for i=1,#keys,5000 do redis.call('$unlink',unpack(keys,i,math.min(i+4999,#keys))) end return 1", $args[0], $args[1]) && $cleared;
512 continue;
513 }
514
515 $cursor = null;
516 do {
517 $keys = $host instanceof \Predis\ClientInterface ? $host->scan($cursor, 'MATCH', $pattern, 'COUNT', 1000) : $host->scan($cursor, $pattern, 1000);
518 if (isset($keys[1]) && \is_array($keys[1])) {
519 $cursor = $keys[0];
520 $keys = $keys[1];
521 }
522 if ($keys) {
523 if ($prefixLen) {
524 foreach ($keys as $i => $key) {
525 $keys[$i] = substr($key, $prefixLen);
526 }
527 }
528 $this->doDelete($keys);
529 }
530 } while ($cursor);
531 }
532
533 return $cleared;
534 }
535
536 protected function doDelete(array $ids): bool
537 {
538 if (!$ids) {
539 return true;
540 }
541
542 if ($this->redis instanceof \Predis\ClientInterface && ($this->redis->getConnection() instanceof ClusterInterface || $this->redis->getConnection() instanceof Predis2ClusterInterface)) {
543 static $del;
544 $del ??= (class_exists(UNLINK::class) ? 'unlink' : 'del');
545
546 $this->pipeline(function () use ($ids, $del) {
547 foreach ($ids as $id) {
548 yield $del => [$id];
549 }
550 })->rewind();
551 } else {
552 static $unlink = true;
553
554 if ($unlink) {
555 try {
556 $unlink = false !== $this->redis->unlink($ids);
557 } catch (\Throwable) {
558 $unlink = false;
559 }
560 }
561
562 if (!$unlink) {
563 $this->redis->del($ids);
564 }
565 }
566
567 return true;
568 }
569
570 protected function doSave(array $values, int $lifetime): array|bool
571 {
572 if (!$values = $this->marshaller->marshall($values, $failed)) {
573 return $failed;
574 }
575
576 $results = $this->pipeline(function () use ($values, $lifetime) {
577 foreach ($values as $id => $value) {
578 if (0 >= $lifetime) {
579 yield 'set' => [$id, $value];
580 } else {
581 yield 'setEx' => [$id, $lifetime, $value];
582 }
583 }
584 });
585
586 foreach ($results as $id => $result) {
587 if (true !== $result && (!$result instanceof Status || Status::get('OK') !== $result)) {
588 $failed[] = $id;
589 }
590 }
591
592 return $failed;
593 }
594
595 private function pipeline(\Closure $generator, ?object $redis = null): \Generator
596 {
597 $ids = [];
598 $redis ??= $this->redis;
599
600 if ($redis instanceof \RedisCluster || ($redis instanceof \Predis\ClientInterface && ($redis->getConnection() instanceof RedisCluster || $redis->getConnection() instanceof Predis2RedisCluster))) {
601 // phpredis & predis don't support pipelining with RedisCluster
602 // see https://github.com/phpredis/phpredis/blob/develop/cluster.markdown#pipelining
603 // see https://github.com/nrk/predis/issues/267#issuecomment-123781423
604 $results = [];
605 foreach ($generator() as $command => $args) {
606 $results[] = $redis->{$command}(...$args);
607 $ids[] = 'eval' === $command ? ($redis instanceof \Predis\ClientInterface ? $args[2] : $args[1][0]) : $args[0];
608 }
609 } elseif ($redis instanceof \Predis\ClientInterface) {
610 $results = $redis->pipeline(static function ($redis) use ($generator, &$ids) {
611 foreach ($generator() as $command => $args) {
612 $redis->{$command}(...$args);
613 $ids[] = 'eval' === $command ? $args[2] : $args[0];
614 }
615 });
616 } elseif ($redis instanceof \RedisArray) {
617 $connections = $results = $ids = [];
618 foreach ($generator() as $command => $args) {
619 $id = 'eval' === $command ? $args[1][0] : $args[0];
620 if (!isset($connections[$h = $redis->_target($id)])) {
621 $connections[$h] = [$redis->_instance($h), -1];
622 $connections[$h][0]->multi(\Redis::PIPELINE);
623 }
624 $connections[$h][0]->{$command}(...$args);
625 $results[] = [$h, ++$connections[$h][1]];
626 $ids[] = $id;
627 }
628 foreach ($connections as $h => $c) {
629 $connections[$h] = $c[0]->exec();
630 }
631 foreach ($results as $k => [$h, $c]) {
632 $results[$k] = $connections[$h][$c];
633 }
634 } else {
635 $redis->multi($redis instanceof Relay ? Relay::PIPELINE : \Redis::PIPELINE);
636 foreach ($generator() as $command => $args) {
637 $redis->{$command}(...$args);
638 $ids[] = 'eval' === $command ? $args[1][0] : $args[0];
639 }
640 $results = $redis->exec();
641 }
642
643 if (!$redis instanceof \Predis\ClientInterface && 'eval' === $command && $redis->getLastError()) {
644 $e = $redis instanceof Relay ? new \Relay\Exception($redis->getLastError()) : new \RedisException($redis->getLastError());
645 $results = array_map(fn ($v) => false === $v ? $e : $v, (array) $results);
646 }
647
648 if (\is_bool($results)) {
649 return;
650 }
651
652 foreach ($ids as $k => $id) {
653 yield $id => $results[$k];
654 }
655 }
656
657 private function getHosts(): array
658 {
659 $hosts = [$this->redis];
660 if ($this->redis instanceof \Predis\ClientInterface) {
661 $connection = $this->redis->getConnection();
662 if (($connection instanceof ClusterInterface || $connection instanceof Predis2ClusterInterface) && $connection instanceof \Traversable) {
663 $hosts = [];
664 foreach ($connection as $c) {
665 $hosts[] = new \Predis\Client($c);
666 }
667 }
668 } elseif ($this->redis instanceof \RedisArray) {
669 $hosts = [];
670 foreach ($this->redis->_hosts() as $host) {
671 $hosts[] = $this->redis->_instance($host);
672 }
673 } elseif ($this->redis instanceof \RedisCluster) {
674 $hosts = [];
675 foreach ($this->redis->_masters() as $host) {
676 $hosts[] = new RedisClusterNodeProxy($host, $this->redis);
677 }
678 }
679
680 return $hosts;
681 }
682}