diff --git a/Sources/RediStack/Commands/PubSubCommands.swift b/Sources/RediStack/Commands/PubSubCommands.swift index d78bac4..cf057e2 100644 --- a/Sources/RediStack/Commands/PubSubCommands.swift +++ b/Sources/RediStack/Commands/PubSubCommands.swift @@ -52,8 +52,9 @@ extension RedisCommand { /// [PUBSUB NUMSUB](https://redis.io/commands/pubsub#codepubsub-numsub-channel-1--channel-ncode) /// - Parameter channels: A list of channel names to collect the subscriber counts for. public static func pubsubNumsub(forChannels channels: [RedisChannelName]) -> RedisCommand<[RedisChannelName: Int]> { - let args = channels.map { $0.convertedToRESPValue() } - return .init(keyword: "PUBSUB NUMSUB", arguments: args) { + var args: [RESPValue] = [.init(bulk: "NUMSUB")] + args.append(convertingContentsOf: channels) + return .init(keyword: "PUBSUB", arguments: args) { let response = try $0.map(to: [RESPValue].self) assert(response.count == channels.count * 2, "Unexpected response size!") @@ -62,11 +63,12 @@ extension RedisCommand { return try channels .enumerated() .reduce(into: [:]) { (result, next) in - assert(next.element.rawValue == response[next.offset].string, "Unexpected value in current index!") + let responseOffset = next.offset * 2 + assert(next.element.rawValue == response[responseOffset].string, "Unexpected value in current index!") - guard let count = response[next.offset + 1].int else { + guard let count = response[responseOffset + 1].int else { throw RedisClientError.assertionFailure( - message: "Unexpected value at position \(next.offset + 1) in \(response)" + message: "Unexpected value at position \(responseOffset + 1) in \(response)" ) } result[next.element] = count diff --git a/Tests/RediStackIntegrationTests/Commands/PubSubCommandsTests.swift b/Tests/RediStackIntegrationTests/Commands/PubSubCommandsTests.swift index c6d0e6d..d7f9f27 100644 --- a/Tests/RediStackIntegrationTests/Commands/PubSubCommandsTests.swift +++ b/Tests/RediStackIntegrationTests/Commands/PubSubCommandsTests.swift @@ -232,6 +232,42 @@ final class RedisPubSubCommandsTests: RediStackIntegrationTestCase { let allChannels = try queryConnection.send(.pubsubChannels()).wait() XCTAssertGreaterThanOrEqual(allChannels.count, channelNames.count) } + + func test_pubSubNumsub() throws { + let fn = #function + let subscriber = try self.makeNewConnection() + defer { try? subscriber.close().wait() } + + let channelNames = (1...5).map { + RedisChannelName("\(fn)\($0)") + } + + for channelName in channelNames { + try subscriber.subscribe( + to: channelName, + messageReceiver: { _, _ in }, + onSubscribe: nil, + onUnsubscribe: nil + ).wait() + } + XCTAssertTrue(subscriber.isSubscribed) + defer { + // Unsubscribe (clean up) + try? subscriber.unsubscribe(from: channelNames).wait() + XCTAssertFalse(subscriber.isSubscribed) + } + + // Make another connection to query on. + let queryConnection = try self.makeNewConnection() + defer { try? queryConnection.close().wait() } + + let notSubscribedChannel = RedisChannelName("\(fn)_notsubbed") + let numSubs = try queryConnection.send(.pubsubNumsub(forChannels: [channelNames[0], notSubscribedChannel])).wait() + XCTAssertEqual(numSubs.count, 2) + + XCTAssertGreaterThanOrEqual(numSubs[channelNames[0]] ?? 0, 1) + XCTAssertEqual(numSubs[notSubscribedChannel], 0) + } } final class RedisPubSubCommandsPoolTests: RediStackConnectionPoolIntegrationTestCase {