diff --git a/Adapter/AbstractTagAwareAdapter.php b/Adapter/AbstractTagAwareAdapter.php index a384b16a..21be7f52 100644 --- a/Adapter/AbstractTagAwareAdapter.php +++ b/Adapter/AbstractTagAwareAdapter.php @@ -35,7 +35,7 @@ abstract class AbstractTagAwareAdapter implements TagAwareAdapterInterface, TagA use AbstractAdapterTrait; use ContractsTrait; - private const TAGS_PREFIX = "\0tags\0"; + protected const TAGS_PREFIX = "\0tags\0"; protected function __construct(string $namespace = '', int $defaultLifetime = 0) { diff --git a/Adapter/RedisTagAwareAdapter.php b/Adapter/RedisTagAwareAdapter.php index 186b32e7..6cd7e980 100644 --- a/Adapter/RedisTagAwareAdapter.php +++ b/Adapter/RedisTagAwareAdapter.php @@ -22,6 +22,7 @@ use Symfony\Component\Cache\Marshaller\DeflateMarshaller; use Symfony\Component\Cache\Marshaller\MarshallerInterface; use Symfony\Component\Cache\Marshaller\TagAwareMarshaller; +use Symfony\Component\Cache\PruneableInterface; use Symfony\Component\Cache\Traits\RedisClusterProxy; use Symfony\Component\Cache\Traits\RedisProxy; use Symfony\Component\Cache\Traits\RedisTrait; @@ -45,7 +46,7 @@ * @author Nicolas Grekas * @author André Rømcke */ -class RedisTagAwareAdapter extends AbstractTagAwareAdapter +class RedisTagAwareAdapter extends AbstractTagAwareAdapter implements PruneableInterface { use RedisTrait; @@ -59,14 +60,23 @@ class RedisTagAwareAdapter extends AbstractTagAwareAdapter * @var string|null detected eviction policy used on Redis server */ private $redisEvictionPolicy; + /** + * @var string|null detected redis version of Redis server + */ + private $redisVersion; + /** + * @var bool|null Indicate whether this "namespace" has been pruned and what the result was. + */ + private $pruneResult; private $namespace; /** * @param \Redis|\RedisArray|\RedisCluster|\Predis\ClientInterface|RedisProxy|RedisClusterProxy $redis The redis client * @param string $namespace The default namespace * @param int $defaultLifetime The default lifetime + * @param bool $pruneWithCompression Enable compressed prune. Way more resource intensive. */ - public function __construct($redis, string $namespace = '', int $defaultLifetime = 0, MarshallerInterface $marshaller = null) + public function __construct($redis, string $namespace = '', int $defaultLifetime = 0, MarshallerInterface $marshaller = null, bool $pruneWithCompression = true) { if ($redis instanceof \Predis\ClientInterface && $redis->getConnection() instanceof ClusterInterface && !$redis->getConnection() instanceof PredisCluster) { throw new InvalidArgumentException(sprintf('Unsupported Predis cluster connection: only "%s" is, "%s" given.', PredisCluster::class, get_debug_type($redis->getConnection()))); @@ -84,6 +94,7 @@ public function __construct($redis, string $namespace = '', int $defaultLifetime $this->init($redis, $namespace, $defaultLifetime, new TagAwareMarshaller($marshaller)); $this->namespace = $namespace; + $this->pruneWithCompression = $pruneWithCompression; } /** @@ -295,6 +306,36 @@ protected function doInvalidate(array $tagIds): bool return $success; } + /** + * @TODO Move to RedisTrait? It already has a version check - this would be handy. + * + * @return string + */ + private function getRedisVersion(): string + { + if (null !== $this->redisVersion) { + return $this->redisVersion; + } + + $hosts = $this->getHosts(); + $host = reset($hosts); + if ($host instanceof \Predis\Client && $host->getConnection() instanceof ReplicationInterface) { + // Predis supports info command only on the master in replication environments + $hosts = [$host->getClientFor('master')]; + } + + foreach ($hosts as $host) { + $info = $host->info('Server'); + + if ($info instanceof ErrorInterface) { + continue; + } + return $this->redisVersion = $info['redis_version']; + } + // Fallback to 2.0 like RedisTrait does. + return $this->redisVersion = '2.0'; + } + private function getRedisEvictionPolicy(): string { if (null !== $this->redisEvictionPolicy) { @@ -322,4 +363,377 @@ private function getRedisEvictionPolicy(): string return $this->redisEvictionPolicy = ''; } + + private function getPrefix(): string + { + if ($this->redis instanceof \Predis\ClientInterface) { + $prefix = $this->redis->getOptions()->prefix ? $this->redis->getOptions()->prefix->getPrefix() : ''; + } elseif (\is_array($prefix = $this->redis->getOption(\Redis::OPT_PREFIX) ?? '')) { + $prefix = current($prefix); + } + + return $prefix; + } + + /** + * Returns all existing tag keys from the cache. + * + * @TODO Verify the LUA scripts are redis-cluster safe. + */ + protected function getAllTagKeys(): array + { + $tagKeys = []; + $prefix = $this->getPrefix(); + // need to trim the \0 for lua script + $tagsPrefix = trim(self::TAGS_PREFIX); + + // get all SET entries which are tagged + $getTagsLua = <<<'EOLUA' + redis.replicate_commands() + local cursor = ARGV[1] + local prefix = ARGV[2] + local tagPrefix = string.gsub(KEYS[1], prefix, "") + return redis.call('SCAN', cursor, 'COUNT', 5000, 'MATCH', '*' .. tagPrefix .. '*', 'TYPE', 'set') + EOLUA; + $cursor = 0; + do { + $results = $this->pipeline(function () use ($getTagsLua, $cursor, $prefix, $tagsPrefix) { + yield 'eval' => [$getTagsLua, [$tagsPrefix, $cursor, $prefix], 1]; + }); + + $setKeys = $results->valid() ? iterator_to_array($results) : []; + // $setKeys[$tagsPrefix] might be an RedisException object - + // check before just using it. + if (is_array($setKeys[$tagsPrefix])) { + [$cursor, $ids] = $setKeys[$tagsPrefix] ?? [null, null]; + // merge the fetched ids together + $tagKeys = array_merge($tagKeys, $ids); + } elseif (isset($setKeys[$tagsPrefix]) && $setKeys[$tagsPrefix] instanceof \Throwable) { + $this->logger->error($setKeys[$tagsPrefix]->getMessage()); + } + } while ($cursor = (int) $cursor); + + return $tagKeys; + } + + + /** + * Applies a callback to all tag keys. + * + * @TODO Verify the LUA scripts are redis-cluster safe. + */ + protected function processAllTagKeys(\Closure $generator): \Generator + { + $prefix = $this->getPrefix(); + // need to trim the \0 for lua script + $tagsPrefix = trim(self::TAGS_PREFIX); + + // get all SET entries which are tagged + $getTagsLua = <<<'EOLUA' + redis.replicate_commands() + local cursor = ARGV[1] + local prefix = ARGV[2] + local tagPrefix = string.gsub(KEYS[1], prefix, "") + return redis.call('SCAN', cursor, 'COUNT', 5000, 'MATCH', '*' .. tagPrefix .. '*', 'TYPE', 'set') + EOLUA; + $cursor = 0; + do { + $results = $this->pipeline(function () use ($getTagsLua, $cursor, $prefix, $tagsPrefix) { + yield 'eval' => [$getTagsLua, [$tagsPrefix, $cursor, $prefix], 1]; + }); + $setKeys = $results->valid() ? iterator_to_array($results) : []; + // $setKeys[$tagsPrefix] might be an RedisException object - + // check before just using it. + if (is_array($setKeys[$tagsPrefix])) { + [$cursor, $tagKeys] = $setKeys[$tagsPrefix]; + // merge the fetched ids together + foreach ($tagKeys as $tagKey) { + yield $tagKey => $generator($tagKey); + } + } elseif (isset($setKeys[$tagsPrefix]) && $setKeys[$tagsPrefix] instanceof \Throwable) { + $this->logger->error($setKeys[$tagsPrefix]->getMessage()); + } + } while ($cursor = (int) $cursor); + } + + private function processSetMembers(\Closure $generator, $key): \Generator + { + // lua for fetching all entries/content from a SET + $getSetContentLua = <<<'EOLUA' + redis.replicate_commands() + local cursor = ARGV[1] + return redis.call('SSCAN', KEYS[1], cursor, 'COUNT', 5000) + EOLUA; + + $cursor = 0; + do { + // Fetch all referenced cache keys from the tag entry. + $results = $this->pipeline(function () use ($getSetContentLua, $key, $cursor) { + yield 'eval' => [$getSetContentLua, [$key, $cursor], 1]; + }); + [$cursor, $setMembers] = $results->valid() ? $results->current() : [null, null]; + yield $cursor => $generator($setMembers); + } while ($cursor = (int) $cursor); + } + + /** + * Accepts a list of cache keys and returns a list with orphaned keys. + * + * The method attempts to optimize the testing of the keys by batching the + * key tests and hence reduce the amount of redis calls. + * + * @param array $cacheKeys + * @param int $chunks Number of chunks to create when processing cacheKeys. + * + * @return array + */ + private function getOrphanedCacheKeys(array $cacheKeys, int $chunks = 2) + { + $orphanedCacheKeys = []; + if (version_compare($this->getRedisVersion(), '3.0.3', '>=')) { + // If we can check multiple keys at once divide and conquer to have + // faster execution. + $cacheKeysChunks = array_chunk($cacheKeys, max(1, floor(count($cacheKeys) / $chunks)), true); + foreach ($cacheKeysChunks as $cacheKeysChunk) { + $result = $this->pipeline(function () use ($cacheKeysChunk) { + yield 'exists' => [$cacheKeysChunk]; + }); + if ($result->valid()) { + $existingKeys = $result->current(); + if ($existingKeys === 0) { + // None of the chunk exists - register all. + $orphanedCacheKeys = array_merge($orphanedCacheKeys, $cacheKeysChunk); + } elseif ($existingKeys !== ($cacheKeysChunkCount = count($cacheKeysChunk))) { + // Some exists some don't - trigger another batch of chunks. + // The chunk size should be small enough that the there + // is a high possibility to hit an exists run with 0 + // existing items in it - thus breaking the recursion. + // The simplest approach would be to use the ratio + // between existing and missing keys - but this doesn't + // account for fragmentation. So instead the chunk size + // has to be smaller than the ratio in order to get + // block hits even if there's fragmentation. + // 10 keys, 3 orphans -> chunks min 3, with fragmentation probably more is better. + // For now the ratio between existing keys and total keys rounded up to next int plus 1 is used. + // This should account for some fragmentation. + // @TODO Someone got probabilistic stats on what's the + // best chunk size based on keys, orphanedKeys? + // @TODO At what chunk size is a single item comparison + // more efficient? + $newChunks = max(1, ceil($cacheKeysChunkCount / ($cacheKeysChunkCount - $existingKeys)) + 1); + $orphanedCacheKeys = array_merge($orphanedCacheKeys, $this->getOrphanedCacheKeys($cacheKeysChunk, $newChunks)); + } + } + } + } else { + // Without multi-key support in exists each single reference + // has to be checked individually to create the delta. + foreach ($cacheKeys as $cacheKey) { + $result = $this->pipeline(function () use ($cacheKey) { + yield 'exists' => [$cacheKey]; + }); + if ($result->valid() && !$result->current()) { + $orphanedCacheKeys[] = $cacheKey; + } + } + } + return $orphanedCacheKeys; + } + + /** + * Checks all tags in the cache for orphaned items and creates a "report" array. + * + * @TODO Verify the LUA scripts are redis-cluster safe. + * + * @return array{tagKeys: string[], orphanedTagKeys: string[], orphanedTagReferenceKeyCount?: array} + * tagKeys: List of all tags in the cache. + * orphanedTagKeys: List of tags that only reference orphaned cache items. + * orphanedTagReferenceKeyCount: Number of orphaned cache item references per tag. + * Keyed by tag, value is the number of orphaned cache item keys. + */ + private function getOrphanedTagsStats(): array + { + // Iterates over all tags, analyzing the keys referencing the tag. + $tags = $this->processAllTagKeys(function ($tagKey) { + return $this->processSetMembers(function ($membersChunk) { + $keyReferencesCount = count($membersChunk); + // By default assume all references exist. + $existingKeysCount = count($membersChunk); + $result = $this->pipeline(function () use ($membersChunk) { + yield 'exists' => [$membersChunk]; + }); + if ($result->valid()) { + $existingKeysCount = $result->current(); + } + yield [ + 'keyReferencesCount' => $keyReferencesCount, + 'existingKeysCount' => $existingKeysCount, + ]; + }, $tagKey); + }); + + $stats = [ + 'tagKeys' => [], + 'orphanedTagKeys' => [], + 'orphanedTagReferenceKeyCount' => [], + ]; + foreach ($tags as $tag => $referencedKeyChunks) { + $stats['tagKeys'][$tag] = $tag; + $orphanedReferencesCount = 0; + $tagHasExistingKeys = false; + foreach ($referencedKeyChunks as $referencedKeyChunk) { + foreach ($referencedKeyChunk as $chunkData) { + $tagHasExistingKeys = $tagHasExistingKeys || ($chunkData['existingKeysCount'] > 0); + $orphanedReferencesCount += $chunkData['keyReferencesCount'] - $chunkData['existingKeysCount']; + } + } + if (!$tagHasExistingKeys) { + $stats['orphanedTagKeys'][$tag] = $tag; + } else { + $stats['orphanedTagReferenceKeyCount'][$tag] = $orphanedReferencesCount; + } + } + return $stats; + } + + /** + * Iterates over all existing tag sets. Checks if the tag set has keys that + * exist. If none of the references exist anymore the tag is deleted. + * If a sub-set of references exists and compression is enabled the orphaned + * references are deleted once the orphan / existing references ratio is + * above the set threshold. + * + * The processing happens progressively to ensure a cleanup happens even in + * the event of an error somewhere in the later processing. + * + * @param bool $compression If enabled orphaned references in tag sets are + * removed too. Can be quite resource intensive. + * @param float $compressionRatioThreshold Compression is only executed if + * at least the given percentage of references of a tag are orphaned. + * @param int $pruneThrottlingMs Delay in milliseconds between major prune + * actions. Used to reduce the load on redis during pruning. + * + */ + private function pruneOrphanedTags( + bool $compression = false, + float $compressionRatioThreshold = 0.3, + int $pruneThrottlingMs = 50 + ): bool { + $success = true; + // Enable compression only if redis version is at least 3.0.3 otherwise + // the processing has to check each single key instead of batching. + $compression = $compression && version_compare($this->getRedisVersion(), '3.0.3', '>='); + + // Iterates over all tags, analyzing the keys referencing the tag. + $tags = $this->processAllTagKeys(function ($tagKey) use ($compression, $pruneThrottlingMs) { + return $this->processSetMembers(function ($membersChunk) use ($compression, $pruneThrottlingMs) { + $keyReferencesCount = count($membersChunk); + // By default assume all references exist. + $existingKeysCount = count($membersChunk); + $result = $this->pipeline(function () use ($membersChunk) { + yield 'exists' => [$membersChunk]; + }); + if ($result->valid()) { + $existingKeysCount = $result->current(); + } + $returnValue = [ + 'keyReferencesCount' => $keyReferencesCount, + 'existingKeysCount' => $existingKeysCount, + ]; + if ($compression) { + $returnValue['orphanedKeyReferences'] = $this->getOrphanedCacheKeys($membersChunk); + } + usleep($pruneThrottlingMs * 1000); + yield $returnValue; + }, $tagKey); + }); + + $orphanedTags = []; + foreach ($tags as $tag => $referencedKeyChunks) { + usleep($pruneThrottlingMs * 1000); + $tagHasExistingKeys = false; + $tagOrphanedKeyReferences = []; + $tagKeyReferencesCount = 0; + foreach ($referencedKeyChunks as $referencedKeyChunk) { + foreach ($referencedKeyChunk as $chunkData) { + // This notation should ensure the existingKeysCount are only + // evaluated up until the point $tagHasExistingKeys is true + // after that there's no need to eval again. + $tagHasExistingKeys = $tagHasExistingKeys || (is_numeric($chunkData['existingKeysCount']) && $chunkData['existingKeysCount'] > 0); + $tagKeyReferencesCount += $chunkData['keyReferencesCount'] ?? 0; + // Collect the orphaned key references in the tag set. + // @TODO If the memory foot print is getting to big we might + // have to process the orphans right here or in the actual + // generator in order to avoid this collection of keys. + // However, collecting them allows for reduction in command + // execution. + if (!empty($chunkData['orphanedKeyReferences'])) { + $tagOrphanedKeyReferences = array_merge($tagOrphanedKeyReferences, $chunkData['orphanedKeyReferences']); + } + } + } + if (!$tagHasExistingKeys) { + $orphanedTags[$tag] = $tag; + } + // Delete in batches of 250 to chip away even if there are errors. + if (count($orphanedTags) >= 250) { + try { + $result = $this->pipeline(function () use ($orphanedTags) { + yield 'del' => [$orphanedTags]; + }); + if (!$result->valid() || count($orphanedTags) !== $result->current()) { + $success = false; + } + } catch (\Throwable $e) { + $success = false; + } finally { + $orphanedTags = []; + } + } + // If compression mode is enabled and the count between + // referenced and existing cache keys differs by more than + // the compression ratio threshold run the compression. + // The ratio trigger ensures that a more sensible amount of + // items is processed. Currently, the set is processed as + // soon as at least a third of references are orphaned. + if ( + $tagHasExistingKeys + && $compression + && !empty($tagOrphanedKeyReferences) + && (count($tagOrphanedKeyReferences) / $tagKeyReferencesCount) >= $compressionRatioThreshold + ) { + try { + // Remove orphaned cache item references from the tag set. + // SREM supports multiple members as of redis 2.4.0 - the + // compression handling is enabled as of redis 3.0.3. + $result = $this->pipeline(function () use ($tag, $tagOrphanedKeyReferences) { + yield 'sRem' => array_merge([$tag], $tagOrphanedKeyReferences); + }); + if (!$result->valid() || count($tagOrphanedKeyReferences) !== $result->current()) { + $success = false; + } + } catch (\Throwable $e) { + $success = false; + } + } + } + if (!empty($orphanedTags)) { + $result = $this->pipeline(function () use ($orphanedTags) { + yield 'del' => [$orphanedTags]; + }); + if (!$result->valid() || 1 !== $result->current()) { + $success = false; + } + } + return $success; + } + + public function prune(): bool + { + // Only prune once per prune run. + if (!isset($this->pruneResult)) { + $this->pruneResult = $this->pruneOrphanedTags($this->pruneWithCompression); + } + return $this->pruneResult; + } }