Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<CodeAnalysisRuleset>$(MSBuildThisFileDirectory)Shared.ruleset</CodeAnalysisRuleset>
<MSBuildWarningsAsMessages>NETSDK1069</MSBuildWarningsAsMessages>
<NoWarn>$(NoWarn);NU5105;NU1507;SER001;SER002</NoWarn>
<NoWarn>$(NoWarn);NU5105;NU1507;SER001;SER002;SER003</NoWarn>
<PackageReleaseNotes>https://stackexchange.github.io/StackExchange.Redis/ReleaseNotes</PackageReleaseNotes>
<PackageProjectUrl>https://stackexchange.github.io/StackExchange.Redis/</PackageProjectUrl>
<PackageLicenseExpression>MIT</PackageLicenseExpression>
Expand Down
4 changes: 4 additions & 0 deletions docs/ReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ Current package versions:
| ------------ | ----------------- | ----- |
| [![StackExchange.Redis](https://img.shields.io/nuget/v/StackExchange.Redis.svg)](https://www.nuget.org/packages/StackExchange.Redis/) | [![StackExchange.Redis](https://img.shields.io/nuget/vpre/StackExchange.Redis.svg)](https://www.nuget.org/packages/StackExchange.Redis/) | [![StackExchange.Redis MyGet](https://img.shields.io/myget/stackoverflow/vpre/StackExchange.Redis.svg)](https://www.myget.org/feed/stackoverflow/package/nuget/StackExchange.Redis) |

## unreleased

- Implement idempotent stream entry (IDMP) support ([#3006 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/3006))

## 2.10.14

- Fix bug with connection startup failing in low-memory scenarios ([#3002 by nathan-miller23](https://github.com/StackExchange/StackExchange.Redis/pull/3002))
Expand Down
18 changes: 18 additions & 0 deletions docs/Streams.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,24 @@ You also have the option to override the auto-generated message ID by passing yo
db.StreamAdd("events_stream", "foo_name", "bar_value", messageId: "0-1", maxLength: 100);
```

Idempotent write-at-most-once production
===

From Redis 8.6, streams support idempotent write-at-most-once production. This is achieved by passing a `StreamIdempotentId` to the `StreamAdd` method. Using idempotent ids avoids
duplicate entries in the stream, even in the event of a failure and retry.

The `StreamIdempotentId` contains a producer id and an optional idempotent id. The producer id should be unique for a given data generator and should be stable and consistent between runs.
The optional idempotent id should be unique for a given data item. If the idempotent id is not provided, the server will generate it from the content of the data item.

```csharp
// int someUniqueExternalSourceId = ... // optional
var idempotentId = new StreamIdempotentId("ticket_generator");
// optionally, new StreamIdempotentId("ticket_generator", someUniqueExternalSourceId)
var messageId = db.StreamAdd("events_stream", "foo_name", "bar_value", idempotentId);
```

~~~~The `StreamConfigure` method can be used to configure the stream, in particular the IDMP map. The `StreamConfiguration` class has properties for the idempotent producer (IDMP) duration and max-size.

Reading from Streams
===

Expand Down
25 changes: 25 additions & 0 deletions docs/exp/SER003.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
Redis 8.6 is currently in preview and may be subject to change.

New features in Redis 8.6 include:

- `HOTKEYS` for profiling CPU and network hot-spots by key
- `XADD IDMP[AUTP]` for idempotent (write-at-most-once) stream addition

The corresponding library feature must also be considered subject to change:

1. Existing bindings may cease working correctly if the underlying server API changes.
2. Changes to the server API may require changes to the library API, manifesting in either/both of build-time
or run-time breaks.

While this seems *unlikely*, it must be considered a possibility. If you acknowledge this, you can suppress
this warning by adding the following to your `csproj` file:

```xml
<NoWarn>$(NoWarn);SER003</NoWarn>
```

or more granularly / locally in C#:

``` c#
#pragma warning disable SER003
```
109 changes: 107 additions & 2 deletions src/StackExchange.Redis/APITypes/StreamInfo.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,31 @@
namespace StackExchange.Redis;
using System.Diagnostics.CodeAnalysis;

namespace StackExchange.Redis;

/// <summary>
/// Describes stream information retrieved using the XINFO STREAM command. <see cref="IDatabase.StreamInfo"/>.
/// </summary>
public readonly struct StreamInfo
{
internal StreamInfo(int length, int radixTreeKeys, int radixTreeNodes, int groups, StreamEntry firstEntry, StreamEntry lastEntry, RedisValue lastGeneratedId)
// OK, I accept that this parameter list / size is getting silly, but: it is too late
// to refactor this as a class.
internal StreamInfo(
int length,
int radixTreeKeys,
int radixTreeNodes,
int groups,
StreamEntry firstEntry,
StreamEntry lastEntry,
RedisValue lastGeneratedId,
RedisValue maxDeletedEntryId,
long entriesAdded,
RedisValue recordedFirstEntryId,
long idmpDuration,
long idmpMaxSize,
long pidsTracked,
long iidsTracked,
long iidsAdded,
long iidsDuplicates)
{
Length = length;
RadixTreeKeys = radixTreeKeys;
Expand All @@ -14,6 +34,19 @@ internal StreamInfo(int length, int radixTreeKeys, int radixTreeNodes, int group
FirstEntry = firstEntry;
LastEntry = lastEntry;
LastGeneratedId = lastGeneratedId;

// 7.0
MaxDeletedEntryId = maxDeletedEntryId;
EntriesAdded = entriesAdded;
RecordedFirstEntryId = recordedFirstEntryId;

// 8.6
IdmpDuration = idmpDuration;
IdmpMaxSize = idmpMaxSize;
PidsTracked = pidsTracked;
IidsTracked = iidsTracked;
IidsAdded = iidsAdded;
IidsDuplicates = iidsDuplicates;
}

/// <summary>
Expand Down Expand Up @@ -50,4 +83,76 @@ internal StreamInfo(int length, int radixTreeKeys, int radixTreeNodes, int group
/// The last generated id.
/// </summary>
public RedisValue LastGeneratedId { get; }

/// <summary>
/// The first id recorded for the stream.
/// </summary>
public RedisValue RecordedFirstEntryId { get; }

/// <summary>
/// The count of all entries added to the stream during its lifetime.
/// </summary>
public long EntriesAdded { get; }

/// <summary>
/// The maximal entry ID that was deleted from the stream.
/// </summary>
public RedisValue MaxDeletedEntryId { get; }

/// <summary>
/// The duration value configured for the stream’s IDMP map (seconds), or <c>-1</c> if unavailable.
/// </summary>
public long IdmpDuration
{
[Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)]
get;
}

/// <summary>
/// The maxsize value configured for the stream’s IDMP map, or <c>-1</c> if unavailable.
/// </summary>
public long IdmpMaxSize
{
[Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)]
get;
}

/// <summary>
/// The number of idempotent pids currently tracked in the stream, or <c>-1</c> if unavailable.
/// </summary>
public long PidsTracked
{
[Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)]
get;
}

/// <summary>
/// The number of idempotent ids currently tracked in the stream, or <c>-1</c> if unavailable.
/// This count reflects active iids that haven't expired or been evicted yet.
/// </summary>
public long IidsTracked
{
[Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)]
get;
}

/// <summary>
/// The count of all entries with an idempotent iid added to the stream during its lifetime, or <c>-1</c> if unavailable.
/// This is a cumulative counter that increases with each idempotent entry added.
/// </summary>
public long IidsAdded
{
[Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)]
get;
}

/// <summary>
/// The count of all duplicate iids (for all pids) detected during the stream's lifetime, or <c>-1</c> if unavailable.
/// This is a cumulative counter that increases with each duplicate iid.
/// </summary>
public long IidsDuplicates
{
[Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)]
get;
}
}
2 changes: 2 additions & 0 deletions src/StackExchange.Redis/Enums/RedisCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ internal enum RedisCommand
XADD,
XAUTOCLAIM,
XCLAIM,
XCFGSET,
XDEL,
XDELEX,
XGROUP,
Expand Down Expand Up @@ -375,6 +376,7 @@ internal static bool IsPrimaryOnly(this RedisCommand command)
case RedisCommand.VREM:
case RedisCommand.VSETATTR:
case RedisCommand.XAUTOCLAIM:
case RedisCommand.XCFGSET:
case RedisCommand.ZADD:
case RedisCommand.ZDIFFSTORE:
case RedisCommand.ZINTERSTORE:
Expand Down
2 changes: 2 additions & 0 deletions src/StackExchange.Redis/Experiments.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ internal static class Experiments
public const string VectorSets = "SER001";
// ReSharper disable once InconsistentNaming
public const string Server_8_4 = "SER002";
// ReSharper disable once InconsistentNaming
public const string Server_8_6 = "SER003";
}
}

Expand Down
52 changes: 50 additions & 2 deletions src/StackExchange.Redis/Interfaces/IDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2662,7 +2662,27 @@ IEnumerable<SortedSetEntry> SortedSetScan(
/// <remarks><seealso href="https://redis.io/commands/xadd"/></remarks>
#pragma warning disable RS0026 // different shape
RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None);
#pragma warning restore RS0026

/// <summary>
/// Adds an entry using the specified values to the given stream key.
/// If key does not exist, a new key holding a stream is created.
/// The command returns the ID of the newly created stream entry, using
/// the idempotent id (pid/iid) mechanism to ensure at-most-once production.
/// See <see cref="StreamIdempotentId"/> for more information of the idempotent API.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="streamField">The field name for the stream entry.</param>
/// <param name="streamValue">The value to set in the stream entry.</param>
/// <param name="idempotentId">The idempotent producer (pid) and optionally id (iid) to use for this entry.</param>
/// <param name="maxLength">The maximum length of the stream.</param>
/// <param name="useApproximateMaxLength">If true, the "~" argument is used to allow the stream to exceed max length by a small number. This improves performance when removing messages.</param>
/// <param name="limit">Specifies the maximal count of entries that will be evicted.</param>
/// <param name="trimMode">Determines how stream trimming should be performed.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>The ID of the newly created message.</returns>
/// <remarks><seealso href="https://redis.io/commands/xadd"/></remarks>
[Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)]
RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None);

/// <summary>
/// Adds an entry using the specified values to the given stream key.
Expand All @@ -2679,10 +2699,38 @@ IEnumerable<SortedSetEntry> SortedSetScan(
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>The ID of the newly created message.</returns>
/// <remarks><seealso href="https://redis.io/commands/xadd"/></remarks>
#pragma warning disable RS0026 // different shape
RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None);

/// <summary>
/// Adds an entry using the specified values to the given stream key.
/// If key does not exist, a new key holding a stream is created.
/// The command returns the ID of the newly created stream entry, using
/// the idempotent id (pid/iid) mechanism to ensure at-most-once production.
/// See <see cref="StreamIdempotentId"/> for more information of the idempotent API.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="streamPairs">The fields and their associated values to set in the stream entry.</param>
/// <param name="idempotentId">The idempotent producer (pid) and optionally id (iid) to use for this entry.</param>
/// <param name="maxLength">The maximum length of the stream.</param>
/// <param name="useApproximateMaxLength">If true, the "~" argument is used to allow the stream to exceed max length by a small number. This improves performance when removing messages.</param>
/// <param name="limit">Specifies the maximal count of entries that will be evicted.</param>
/// <param name="trimMode">Determines how stream trimming should be performed.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>The ID of the newly created message.</returns>
/// <remarks><seealso href="https://redis.io/commands/xadd"/></remarks>
[Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)]
RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None);
#pragma warning restore RS0026

/// <summary>
/// Configures a stream, in particular the IDMP map.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="configuration">The configuration to apply.</param>
/// <param name="flags">The flags to use for this operation.</param>
[Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)]
void StreamConfigure(RedisKey key, StreamConfiguration configuration, CommandFlags flags = CommandFlags.None);

/// <summary>
/// Change ownership of messages consumed, but not yet acknowledged, by a different consumer.
/// Messages that have been idle for more than <paramref name="minIdleTimeInMs"/> will be claimed.
Expand Down
13 changes: 13 additions & 0 deletions src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.ComponentModel;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Net;
using System.Threading.Tasks;

Expand Down Expand Up @@ -655,8 +656,20 @@ IAsyncEnumerable<SortedSetEntry> SortedSetScanAsync(

/// <inheritdoc cref="IDatabase.StreamAdd(RedisKey, NameValueEntry[], RedisValue?, long?, bool, long?, StreamTrimMode, CommandFlags)"/>
Task<RedisValue> StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None);

/// <inheritdoc cref="IDatabase.StreamAdd(RedisKey, RedisValue, RedisValue, StreamIdempotentId, long?, bool, long?, StreamTrimMode, CommandFlags)"/>
[Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)]
Task<RedisValue> StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None);

/// <inheritdoc cref="IDatabase.StreamAdd(RedisKey, NameValueEntry[], StreamIdempotentId, long?, bool, long?, StreamTrimMode, CommandFlags)"/>
[Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)]
Task<RedisValue> StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None);
#pragma warning restore RS0026

/// <inheritdoc cref="IDatabase.StreamConfigure(RedisKey, StreamConfiguration, CommandFlags)"/>
[Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)]
Task StreamConfigureAsync(RedisKey key, StreamConfiguration configuration, CommandFlags flags = CommandFlags.None);

/// <inheritdoc cref="IDatabase.StreamAutoClaim(RedisKey, RedisValue, RedisValue, long, RedisValue, int?, CommandFlags)"/>
Task<StreamAutoClaimResult> StreamAutoClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None);

Expand Down
9 changes: 9 additions & 0 deletions src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,15 @@ public Task<RedisValue> StreamAddAsync(RedisKey key, RedisValue streamField, Red
public Task<RedisValue> StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) =>
Inner.StreamAddAsync(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, limit, mode, flags);

public Task<RedisValue> StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) =>
Inner.StreamAddAsync(ToInner(key), streamField, streamValue, idempotentId, maxLength, useApproximateMaxLength, limit, mode, flags);

public Task<RedisValue> StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) =>
Inner.StreamAddAsync(ToInner(key), streamPairs, idempotentId, maxLength, useApproximateMaxLength, limit, mode, flags);

public Task StreamConfigureAsync(RedisKey key, StreamConfiguration configuration, CommandFlags flags = CommandFlags.None) =>
Inner.StreamConfigureAsync(ToInner(key), configuration, flags);

public Task<StreamAutoClaimResult> StreamAutoClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None) =>
Inner.StreamAutoClaimAsync(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, startAtId, count, flags);

Expand Down
Loading
Loading