Redis
Redis Stream
Created: 14 Jan 2026
Updated: 14 Jan 2026
Redis Stream Consumer Example
#region
using StackExchange.Redis;
#endregion
namespace RedisExample.Consumers;
public class RedisConsumerBackgroundService(RedisService redisService, ILogger<RedisConsumerBackgroundService> logger)
: BackgroundService
{
private const string StreamName = "my-stream-11";
private const string GroupName = "mygroup-1";
// Unique name for this consumer (worker)
private readonly string _consumerName = $"consumer1";
private IDatabase? database;
public override Task StartAsync(CancellationToken cancellationToken)
{
database = redisService.GetDb(1);
return base.StartAsync(cancellationToken);
}
/// <summary>
/// Checks if a consumer group exists
/// </summary>
private async Task<bool> ConsumerGroupExistsAsync(string streamName, string groupName)
{
try
{
// Get all consumer groups in the stream using StreamGroupInfo
var groups = await database!.StreamGroupInfoAsync(streamName);
// Check if a group with the desired name exists
return groups.Any(g => g.Name == groupName);
}
catch (RedisServerException)
{
// If the stream hasn't been created yet, an error is returned, meaning the group doesn't exist
return false;
}
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// Check if the consumer group exists
if (!await ConsumerGroupExistsAsync(StreamName, GroupName))
{
// StreamCreateConsumerGroupAsync executes the XGROUP CREATE command.
// '0-0' : Read all messages from the beginning of the stream.
// createStream: true : Create the stream if it doesn't exist.
await database!.StreamCreateConsumerGroupAsync(StreamName, GroupName, StreamPosition.Beginning, createStream: true);
logger.LogInformation("✅ Consumer group '{GroupName}' created successfully.", GroupName);
}
else
{
logger.LogInformation("ℹ️ Consumer group '{GroupName}' already exists.", GroupName);
}
while (!stoppingToken.IsCancellationRequested)
{
// ═══════════════════════════════════════════════════════════════════════════════
// StreamReadGroupAsync - XREADGROUP command
// ═══════════════════════════════════════════════════════════════════════════════
// ═══════════════════════════════════════════════════════════════════════════════
// POSITION PARAMETER - All Possible Values:
// ═══════════════════════════════════════════════════════════════════════════════
// 1. ">" (Greater Than) - MOST COMMON
// - Reads only NEW messages that have NEVER been delivered to ANY consumer in the group
// - Redis automatically assigns these messages to the requesting consumer
// - Use Case: Normal message consumption flow
// - Example: var entries = await database.StreamReadGroupAsync(stream, group, consumer, ">", 10);
// 2. "0-0" (Zero) - PENDING MESSAGES
// - Reads messages that were delivered to THIS specific consumer but NOT yet acknowledged
// - Shows the Pending Entries List (PEL) for this consumer only
// - Use Case: Recovery from crashes, reprocessing failed messages, implementing retry logic
// - Example: var entries = await database.StreamReadGroupAsync(stream, group, consumer, "0-0", 10);
// 3. Specific Message ID (e.g., "1609459200000-0") - RESUME FROM CHECKPOINT
// - Reads messages starting from the specified message ID
// - Only returns messages that are in this consumer's PEL (Pending Entries List)
// - Must be messages already delivered to this consumer
// - Use Case: Resume from a specific point, selective reprocessing
// - Example: var entries = await database.StreamReadGroupAsync(stream, group, consumer, "1234567890-0", 10);
// ═══════════════════════════════════════════════════════════════════════════════
// IMPORTANT NOTES:
// ═══════════════════════════════════════════════════════════════════════════════
// - ">" = Give me NEW work (never seen by anyone in the group)
// - "0-0" = Show me MY unfinished work (delivered to me but not ACKed)
// - Always process pending messages ("0-0") before reading new ones (">") to avoid PEL buildup
// - Remember to acknowledge messages with StreamAcknowledgeAsync after processing
// ═══════════════════════════════════════════════════════════════════════════════
var entries = await database.StreamReadGroupAsync(
StreamName,
GroupName,
_consumerName,
">", // Read new messages
10 // Get a maximum of 10 messages at a time
);
// if (entries.Length == 0)
// {
// logger.LogInformation("No new messages, waiting...");
// continue;
// }
if (entries.Length > 0)
{
logger.LogInformation("📦 Processing {Count} new messages...", entries.Length);
}
foreach (var entry in entries)
{
//logger.LogInformation("--> Message ID: {MessageId}", entry.Id);
var messageContent = entry.Values.FirstOrDefault(x => x.Name == "message_content").Value;
logger.LogInformation("🔄 consumer1 > Content: {Content}", messageContent);
// await database.StreamAcknowledgeAsync(StreamName, GroupName, entry.Id);
await Task.Delay(400);
}
}
}
public override async Task StopAsync(CancellationToken cancellationToken)
{
logger.LogInformation("🛑 Stopping Redis consumer '{ConsumerName}' from group '{GroupName}'...", _consumerName, GroupName);
try
{
// Remove this consumer from the consumer group
// This cleans up the consumer and removes it from the group's consumer list
if (database != null)
{
await database.StreamDeleteConsumerAsync(StreamName, GroupName, _consumerName);
logger.LogInformation("✅ Consumer '{ConsumerName}' removed from group '{GroupName}' successfully.", _consumerName, GroupName);
}
}
catch (RedisServerException ex)
{
logger.LogWarning(ex, "⚠️ Failed to remove consumer '{ConsumerName}'. It may not exist.", _consumerName);
}
catch (Exception ex)
{
logger.LogError(ex, "❌ Error while stopping consumer '{ConsumerName}'.", _consumerName);
}
logger.LogInformation("🔴 Redis consumer '{ConsumerName}' stopped.", _consumerName);
await base.StopAsync(cancellationToken);
}
}