diff --git a/Directory.Build.props b/Directory.Build.props
index 06542aa32..e36f0f7d1 100644
--- a/Directory.Build.props
+++ b/Directory.Build.props
@@ -10,7 +10,7 @@
true
$(MSBuildThisFileDirectory)Shared.ruleset
NETSDK1069
- $(NoWarn);NU5105;NU1507;SER001;SER002
+ $(NoWarn);NU5105;NU1507;SER001;SER002;SER003
https://stackexchange.github.io/StackExchange.Redis/ReleaseNotes
https://stackexchange.github.io/StackExchange.Redis/
MIT
diff --git a/docs/ReleaseNotes.md b/docs/ReleaseNotes.md
index f77a1d10a..2ccc6d2d0 100644
--- a/docs/ReleaseNotes.md
+++ b/docs/ReleaseNotes.md
@@ -6,6 +6,10 @@ Current package versions:
| ------------ | ----------------- | ----- |
| [](https://www.nuget.org/packages/StackExchange.Redis/) | [](https://www.nuget.org/packages/StackExchange.Redis/) | [](https://www.myget.org/feed/stackoverflow/package/nuget/StackExchange.Redis) |
+## unreleased
+
+- Implement idempotent stream entry (IDMP) support ([#3006 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/3006))
+
## 2.10.14
- Fix bug with connection startup failing in low-memory scenarios ([#3002 by nathan-miller23](https://github.com/StackExchange/StackExchange.Redis/pull/3002))
diff --git a/docs/Streams.md b/docs/Streams.md
index 4378a528d..47e82c2b9 100644
--- a/docs/Streams.md
+++ b/docs/Streams.md
@@ -37,6 +37,24 @@ You also have the option to override the auto-generated message ID by passing yo
db.StreamAdd("events_stream", "foo_name", "bar_value", messageId: "0-1", maxLength: 100);
```
+Idempotent write-at-most-once production
+===
+
+From Redis 8.6, streams support idempotent write-at-most-once production. This is achieved by passing a `StreamIdempotentId` to the `StreamAdd` method. Using idempotent ids avoids
+duplicate entries in the stream, even in the event of a failure and retry.
+
+The `StreamIdempotentId` contains a producer id and an optional idempotent id. The producer id should be unique for a given data generator and should be stable and consistent between runs.
+The optional idempotent id should be unique for a given data item. If the idempotent id is not provided, the server will generate it from the content of the data item.
+
+```csharp
+// int someUniqueExternalSourceId = ... // optional
+var idempotentId = new StreamIdempotentId("ticket_generator");
+// optionally, new StreamIdempotentId("ticket_generator", someUniqueExternalSourceId)
+var messageId = db.StreamAdd("events_stream", "foo_name", "bar_value", idempotentId);
+```
+
+~~~~The `StreamConfigure` method can be used to configure the stream, in particular the IDMP map. The `StreamConfiguration` class has properties for the idempotent producer (IDMP) duration and max-size.
+
Reading from Streams
===
diff --git a/docs/exp/SER003.md b/docs/exp/SER003.md
new file mode 100644
index 000000000..651434063
--- /dev/null
+++ b/docs/exp/SER003.md
@@ -0,0 +1,25 @@
+Redis 8.6 is currently in preview and may be subject to change.
+
+New features in Redis 8.6 include:
+
+- `HOTKEYS` for profiling CPU and network hot-spots by key
+- `XADD IDMP[AUTP]` for idempotent (write-at-most-once) stream addition
+
+The corresponding library feature must also be considered subject to change:
+
+1. Existing bindings may cease working correctly if the underlying server API changes.
+2. Changes to the server API may require changes to the library API, manifesting in either/both of build-time
+ or run-time breaks.
+
+While this seems *unlikely*, it must be considered a possibility. If you acknowledge this, you can suppress
+this warning by adding the following to your `csproj` file:
+
+```xml
+$(NoWarn);SER003
+```
+
+or more granularly / locally in C#:
+
+``` c#
+#pragma warning disable SER003
+```
diff --git a/src/StackExchange.Redis/APITypes/StreamInfo.cs b/src/StackExchange.Redis/APITypes/StreamInfo.cs
index 230ea47fb..e37df5add 100644
--- a/src/StackExchange.Redis/APITypes/StreamInfo.cs
+++ b/src/StackExchange.Redis/APITypes/StreamInfo.cs
@@ -1,11 +1,31 @@
-namespace StackExchange.Redis;
+using System.Diagnostics.CodeAnalysis;
+
+namespace StackExchange.Redis;
///
/// Describes stream information retrieved using the XINFO STREAM command. .
///
public readonly struct StreamInfo
{
- internal StreamInfo(int length, int radixTreeKeys, int radixTreeNodes, int groups, StreamEntry firstEntry, StreamEntry lastEntry, RedisValue lastGeneratedId)
+ // OK, I accept that this parameter list / size is getting silly, but: it is too late
+ // to refactor this as a class.
+ internal StreamInfo(
+ int length,
+ int radixTreeKeys,
+ int radixTreeNodes,
+ int groups,
+ StreamEntry firstEntry,
+ StreamEntry lastEntry,
+ RedisValue lastGeneratedId,
+ RedisValue maxDeletedEntryId,
+ long entriesAdded,
+ RedisValue recordedFirstEntryId,
+ long idmpDuration,
+ long idmpMaxSize,
+ long pidsTracked,
+ long iidsTracked,
+ long iidsAdded,
+ long iidsDuplicates)
{
Length = length;
RadixTreeKeys = radixTreeKeys;
@@ -14,6 +34,19 @@ internal StreamInfo(int length, int radixTreeKeys, int radixTreeNodes, int group
FirstEntry = firstEntry;
LastEntry = lastEntry;
LastGeneratedId = lastGeneratedId;
+
+ // 7.0
+ MaxDeletedEntryId = maxDeletedEntryId;
+ EntriesAdded = entriesAdded;
+ RecordedFirstEntryId = recordedFirstEntryId;
+
+ // 8.6
+ IdmpDuration = idmpDuration;
+ IdmpMaxSize = idmpMaxSize;
+ PidsTracked = pidsTracked;
+ IidsTracked = iidsTracked;
+ IidsAdded = iidsAdded;
+ IidsDuplicates = iidsDuplicates;
}
///
@@ -50,4 +83,76 @@ internal StreamInfo(int length, int radixTreeKeys, int radixTreeNodes, int group
/// The last generated id.
///
public RedisValue LastGeneratedId { get; }
+
+ ///
+ /// The first id recorded for the stream.
+ ///
+ public RedisValue RecordedFirstEntryId { get; }
+
+ ///
+ /// The count of all entries added to the stream during its lifetime.
+ ///
+ public long EntriesAdded { get; }
+
+ ///
+ /// The maximal entry ID that was deleted from the stream.
+ ///
+ public RedisValue MaxDeletedEntryId { get; }
+
+ ///
+ /// The duration value configured for the stream’s IDMP map (seconds), or -1 if unavailable.
+ ///
+ public long IdmpDuration
+ {
+ [Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)]
+ get;
+ }
+
+ ///
+ /// The maxsize value configured for the stream’s IDMP map, or -1 if unavailable.
+ ///
+ public long IdmpMaxSize
+ {
+ [Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)]
+ get;
+ }
+
+ ///
+ /// The number of idempotent pids currently tracked in the stream, or -1 if unavailable.
+ ///
+ public long PidsTracked
+ {
+ [Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)]
+ get;
+ }
+
+ ///
+ /// The number of idempotent ids currently tracked in the stream, or -1 if unavailable.
+ /// This count reflects active iids that haven't expired or been evicted yet.
+ ///
+ public long IidsTracked
+ {
+ [Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)]
+ get;
+ }
+
+ ///
+ /// The count of all entries with an idempotent iid added to the stream during its lifetime, or -1 if unavailable.
+ /// This is a cumulative counter that increases with each idempotent entry added.
+ ///
+ public long IidsAdded
+ {
+ [Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)]
+ get;
+ }
+
+ ///
+ /// The count of all duplicate iids (for all pids) detected during the stream's lifetime, or -1 if unavailable.
+ /// This is a cumulative counter that increases with each duplicate iid.
+ ///
+ public long IidsDuplicates
+ {
+ [Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)]
+ get;
+ }
}
diff --git a/src/StackExchange.Redis/Enums/RedisCommand.cs b/src/StackExchange.Redis/Enums/RedisCommand.cs
index 14f304a35..f731a6676 100644
--- a/src/StackExchange.Redis/Enums/RedisCommand.cs
+++ b/src/StackExchange.Redis/Enums/RedisCommand.cs
@@ -229,6 +229,7 @@ internal enum RedisCommand
XADD,
XAUTOCLAIM,
XCLAIM,
+ XCFGSET,
XDEL,
XDELEX,
XGROUP,
@@ -375,6 +376,7 @@ internal static bool IsPrimaryOnly(this RedisCommand command)
case RedisCommand.VREM:
case RedisCommand.VSETATTR:
case RedisCommand.XAUTOCLAIM:
+ case RedisCommand.XCFGSET:
case RedisCommand.ZADD:
case RedisCommand.ZDIFFSTORE:
case RedisCommand.ZINTERSTORE:
diff --git a/src/StackExchange.Redis/Experiments.cs b/src/StackExchange.Redis/Experiments.cs
index 441b0ec54..547838873 100644
--- a/src/StackExchange.Redis/Experiments.cs
+++ b/src/StackExchange.Redis/Experiments.cs
@@ -12,6 +12,8 @@ internal static class Experiments
public const string VectorSets = "SER001";
// ReSharper disable once InconsistentNaming
public const string Server_8_4 = "SER002";
+ // ReSharper disable once InconsistentNaming
+ public const string Server_8_6 = "SER003";
}
}
diff --git a/src/StackExchange.Redis/Interfaces/IDatabase.cs b/src/StackExchange.Redis/Interfaces/IDatabase.cs
index 3df162682..cf2ecafac 100644
--- a/src/StackExchange.Redis/Interfaces/IDatabase.cs
+++ b/src/StackExchange.Redis/Interfaces/IDatabase.cs
@@ -2662,7 +2662,27 @@ IEnumerable SortedSetScan(
///
#pragma warning disable RS0026 // different shape
RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None);
-#pragma warning restore RS0026
+
+ ///
+ /// Adds an entry using the specified values to the given stream key.
+ /// If key does not exist, a new key holding a stream is created.
+ /// The command returns the ID of the newly created stream entry, using
+ /// the idempotent id (pid/iid) mechanism to ensure at-most-once production.
+ /// See for more information of the idempotent API.
+ ///
+ /// The key of the stream.
+ /// The field name for the stream entry.
+ /// The value to set in the stream entry.
+ /// The idempotent producer (pid) and optionally id (iid) to use for this entry.
+ /// The maximum length of the stream.
+ /// If true, the "~" argument is used to allow the stream to exceed max length by a small number. This improves performance when removing messages.
+ /// Specifies the maximal count of entries that will be evicted.
+ /// Determines how stream trimming should be performed.
+ /// The flags to use for this operation.
+ /// The ID of the newly created message.
+ ///
+ [Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)]
+ RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None);
///
/// Adds an entry using the specified values to the given stream key.
@@ -2679,10 +2699,38 @@ IEnumerable SortedSetScan(
/// The flags to use for this operation.
/// The ID of the newly created message.
///
-#pragma warning disable RS0026 // different shape
RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None);
+
+ ///
+ /// Adds an entry using the specified values to the given stream key.
+ /// If key does not exist, a new key holding a stream is created.
+ /// The command returns the ID of the newly created stream entry, using
+ /// the idempotent id (pid/iid) mechanism to ensure at-most-once production.
+ /// See for more information of the idempotent API.
+ ///
+ /// The key of the stream.
+ /// The fields and their associated values to set in the stream entry.
+ /// The idempotent producer (pid) and optionally id (iid) to use for this entry.
+ /// The maximum length of the stream.
+ /// If true, the "~" argument is used to allow the stream to exceed max length by a small number. This improves performance when removing messages.
+ /// Specifies the maximal count of entries that will be evicted.
+ /// Determines how stream trimming should be performed.
+ /// The flags to use for this operation.
+ /// The ID of the newly created message.
+ ///
+ [Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)]
+ RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None);
#pragma warning restore RS0026
+ ///
+ /// Configures a stream, in particular the IDMP map.
+ ///
+ /// The key of the stream.
+ /// The configuration to apply.
+ /// The flags to use for this operation.
+ [Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)]
+ void StreamConfigure(RedisKey key, StreamConfiguration configuration, CommandFlags flags = CommandFlags.None);
+
///
/// Change ownership of messages consumed, but not yet acknowledged, by a different consumer.
/// Messages that have been idle for more than will be claimed.
diff --git a/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs b/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs
index 855ea6c8f..029c7975e 100644
--- a/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs
+++ b/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs
@@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.ComponentModel;
using System.Diagnostics.CodeAnalysis;
+using System.IO;
using System.Net;
using System.Threading.Tasks;
@@ -655,8 +656,20 @@ IAsyncEnumerable SortedSetScanAsync(
///
Task StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None);
+
+ ///
+ [Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)]
+ Task StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None);
+
+ ///
+ [Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)]
+ Task StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None);
#pragma warning restore RS0026
+ ///
+ [Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)]
+ Task StreamConfigureAsync(RedisKey key, StreamConfiguration configuration, CommandFlags flags = CommandFlags.None);
+
///
Task StreamAutoClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None);
diff --git a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs
index fe23b73c1..c7831fdb8 100644
--- a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs
+++ b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs
@@ -621,6 +621,15 @@ public Task StreamAddAsync(RedisKey key, RedisValue streamField, Red
public Task StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) =>
Inner.StreamAddAsync(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, limit, mode, flags);
+ public Task StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) =>
+ Inner.StreamAddAsync(ToInner(key), streamField, streamValue, idempotentId, maxLength, useApproximateMaxLength, limit, mode, flags);
+
+ public Task StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) =>
+ Inner.StreamAddAsync(ToInner(key), streamPairs, idempotentId, maxLength, useApproximateMaxLength, limit, mode, flags);
+
+ public Task StreamConfigureAsync(RedisKey key, StreamConfiguration configuration, CommandFlags flags = CommandFlags.None) =>
+ Inner.StreamConfigureAsync(ToInner(key), configuration, flags);
+
public Task StreamAutoClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None) =>
Inner.StreamAutoClaimAsync(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, startAtId, count, flags);
diff --git a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs
index 69775c15d..01fe28505 100644
--- a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs
+++ b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
+using System.IO;
using System.Net;
namespace StackExchange.Redis.KeyspaceIsolation
@@ -603,6 +604,15 @@ public RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue str
public RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) =>
Inner.StreamAdd(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, limit, mode, flags);
+ public RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) =>
+ Inner.StreamAdd(ToInner(key), streamField, streamValue, idempotentId, maxLength, useApproximateMaxLength, limit, mode, flags);
+
+ public RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) =>
+ Inner.StreamAdd(ToInner(key), streamPairs, idempotentId, maxLength, useApproximateMaxLength, limit, mode, flags);
+
+ public void StreamConfigure(RedisKey key, StreamConfiguration configuration, CommandFlags flags = CommandFlags.None) =>
+ Inner.StreamConfigure(ToInner(key), configuration, flags);
+
public StreamAutoClaimResult StreamAutoClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None) =>
Inner.StreamAutoClaim(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, startAtId, count, flags);
diff --git a/src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt b/src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt
index ab058de62..3a80ab570 100644
--- a/src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt
+++ b/src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt
@@ -1 +1,31 @@
#nullable enable
+[SER003]override StackExchange.Redis.StreamIdempotentId.Equals(object? obj) -> bool
+[SER003]override StackExchange.Redis.StreamIdempotentId.GetHashCode() -> int
+[SER003]override StackExchange.Redis.StreamIdempotentId.ToString() -> string!
+[SER003]StackExchange.Redis.IDatabase.StreamAdd(StackExchange.Redis.RedisKey key, StackExchange.Redis.NameValueEntry[]! streamPairs, StackExchange.Redis.StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StackExchange.Redis.StreamTrimMode trimMode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisValue
+[SER003]StackExchange.Redis.IDatabase.StreamAdd(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue streamField, StackExchange.Redis.RedisValue streamValue, StackExchange.Redis.StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StackExchange.Redis.StreamTrimMode trimMode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisValue
+[SER003]StackExchange.Redis.IDatabase.StreamConfigure(StackExchange.Redis.RedisKey key, StackExchange.Redis.StreamConfiguration! configuration, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> void
+[SER003]StackExchange.Redis.IDatabaseAsync.StreamAddAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.NameValueEntry[]! streamPairs, StackExchange.Redis.StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StackExchange.Redis.StreamTrimMode trimMode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task!
+[SER003]StackExchange.Redis.IDatabaseAsync.StreamAddAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue streamField, StackExchange.Redis.RedisValue streamValue, StackExchange.Redis.StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StackExchange.Redis.StreamTrimMode trimMode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task!
+[SER003]StackExchange.Redis.IDatabaseAsync.StreamConfigureAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.StreamConfiguration! configuration, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task!
+[SER003]StackExchange.Redis.StreamConfiguration
+[SER003]StackExchange.Redis.StreamConfiguration.IdmpDuration.get -> long?
+[SER003]StackExchange.Redis.StreamConfiguration.IdmpDuration.set -> void
+[SER003]StackExchange.Redis.StreamConfiguration.IdmpMaxSize.get -> long?
+[SER003]StackExchange.Redis.StreamConfiguration.IdmpMaxSize.set -> void
+[SER003]StackExchange.Redis.StreamConfiguration.StreamConfiguration() -> void
+[SER003]StackExchange.Redis.StreamIdempotentId
+[SER003]StackExchange.Redis.StreamIdempotentId.IdempotentId.get -> StackExchange.Redis.RedisValue
+[SER003]StackExchange.Redis.StreamIdempotentId.ProducerId.get -> StackExchange.Redis.RedisValue
+[SER003]StackExchange.Redis.StreamIdempotentId.StreamIdempotentId() -> void
+[SER003]StackExchange.Redis.StreamIdempotentId.StreamIdempotentId(StackExchange.Redis.RedisValue producerId) -> void
+[SER003]StackExchange.Redis.StreamIdempotentId.StreamIdempotentId(StackExchange.Redis.RedisValue producerId, StackExchange.Redis.RedisValue idempotentId) -> void
+[SER003]StackExchange.Redis.StreamInfo.IdmpDuration.get -> long
+[SER003]StackExchange.Redis.StreamInfo.IdmpMaxSize.get -> long
+[SER003]StackExchange.Redis.StreamInfo.IidsAdded.get -> long
+[SER003]StackExchange.Redis.StreamInfo.IidsDuplicates.get -> long
+[SER003]StackExchange.Redis.StreamInfo.IidsTracked.get -> long
+[SER003]StackExchange.Redis.StreamInfo.PidsTracked.get -> long
+StackExchange.Redis.StreamInfo.EntriesAdded.get -> long
+StackExchange.Redis.StreamInfo.MaxDeletedEntryId.get -> StackExchange.Redis.RedisValue
+StackExchange.Redis.StreamInfo.RecordedFirstEntryId.get -> StackExchange.Redis.RedisValue
diff --git a/src/StackExchange.Redis/RedisDatabase.cs b/src/StackExchange.Redis/RedisDatabase.cs
index 056a5380a..ac3c14bcc 100644
--- a/src/StackExchange.Redis/RedisDatabase.cs
+++ b/src/StackExchange.Redis/RedisDatabase.cs
@@ -2782,6 +2782,23 @@ public RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue str
var msg = GetStreamAddMessage(
key,
messageId ?? StreamConstants.AutoGeneratedId,
+ StreamIdempotentId.Empty,
+ maxLength,
+ useApproximateMaxLength,
+ new NameValueEntry(streamField, streamValue),
+ limit,
+ mode,
+ flags);
+
+ return ExecuteSync(msg, ResultProcessor.RedisValue);
+ }
+
+ public RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None)
+ {
+ var msg = GetStreamAddMessage(
+ key,
+ StreamConstants.AutoGeneratedId,
+ idempotentId,
maxLength,
useApproximateMaxLength,
new NameValueEntry(streamField, streamValue),
@@ -2800,6 +2817,23 @@ public Task StreamAddAsync(RedisKey key, RedisValue streamField, Red
var msg = GetStreamAddMessage(
key,
messageId ?? StreamConstants.AutoGeneratedId,
+ StreamIdempotentId.Empty,
+ maxLength,
+ useApproximateMaxLength,
+ new NameValueEntry(streamField, streamValue),
+ limit,
+ mode,
+ flags);
+
+ return ExecuteAsync(msg, ResultProcessor.RedisValue);
+ }
+
+ public Task StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None)
+ {
+ var msg = GetStreamAddMessage(
+ key,
+ StreamConstants.AutoGeneratedId,
+ idempotentId,
maxLength,
useApproximateMaxLength,
new NameValueEntry(streamField, streamValue),
@@ -2818,6 +2852,23 @@ public RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisVal
var msg = GetStreamAddMessage(
key,
messageId ?? StreamConstants.AutoGeneratedId,
+ StreamIdempotentId.Empty,
+ maxLength,
+ useApproximateMaxLength,
+ streamPairs,
+ limit,
+ mode,
+ flags);
+
+ return ExecuteSync(msg, ResultProcessor.RedisValue);
+ }
+
+ public RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None)
+ {
+ var msg = GetStreamAddMessage(
+ key,
+ StreamConstants.AutoGeneratedId,
+ idempotentId,
maxLength,
useApproximateMaxLength,
streamPairs,
@@ -2836,6 +2887,7 @@ public Task StreamAddAsync(RedisKey key, NameValueEntry[] streamPair
var msg = GetStreamAddMessage(
key,
messageId ?? StreamConstants.AutoGeneratedId,
+ StreamIdempotentId.Empty,
maxLength,
useApproximateMaxLength,
streamPairs,
@@ -2846,6 +2898,78 @@ public Task StreamAddAsync(RedisKey key, NameValueEntry[] streamPair
return ExecuteAsync(msg, ResultProcessor.RedisValue);
}
+ public Task StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None)
+ {
+ var msg = GetStreamAddMessage(
+ key,
+ StreamConstants.AutoGeneratedId,
+ idempotentId,
+ maxLength,
+ useApproximateMaxLength,
+ streamPairs,
+ limit,
+ mode,
+ flags);
+
+ return ExecuteAsync(msg, ResultProcessor.RedisValue);
+ }
+
+ public void StreamConfigure(RedisKey key, StreamConfiguration configuration, CommandFlags flags = CommandFlags.None)
+ {
+ var msg = GetStreamConfigureMessage(key, configuration, flags);
+ ExecuteSync(msg, ResultProcessor.DemandOK);
+ }
+
+ public Task StreamConfigureAsync(RedisKey key, StreamConfiguration configuration, CommandFlags flags = CommandFlags.None)
+ {
+ var msg = GetStreamConfigureMessage(key, configuration, flags);
+ return ExecuteAsync(msg, ResultProcessor.DemandOK);
+ }
+
+ private Message GetStreamConfigureMessage(RedisKey key, StreamConfiguration configuration, CommandFlags flags)
+ {
+ if (key.IsNull) throw new ArgumentNullException(nameof(key));
+ if (configuration == null) throw new ArgumentNullException(nameof(configuration));
+ if (configuration.IdmpMaxSize.HasValue)
+ {
+ if (configuration.IdmpDuration.HasValue)
+ {
+ // duration and maxsize
+ return Message.Create(
+ Database,
+ flags,
+ RedisCommand.XCFGSET,
+ key,
+ RedisLiterals.IDMP_DURATION,
+ configuration.IdmpDuration.Value,
+ RedisLiterals.IDMP_MAXSIZE,
+ configuration.IdmpMaxSize.Value);
+ }
+ // just maxsize
+ return Message.Create(
+ Database,
+ flags,
+ RedisCommand.XCFGSET,
+ key,
+ RedisLiterals.IDMP_MAXSIZE,
+ configuration.IdmpMaxSize.Value);
+ }
+
+ if (configuration.IdmpDuration.HasValue)
+ {
+ // just duration
+ return Message.Create(
+ Database,
+ flags,
+ RedisCommand.XCFGSET,
+ key,
+ RedisLiterals.IDMP_DURATION,
+ configuration.IdmpDuration.Value);
+ }
+
+ return Message.Create(Database, flags, RedisCommand.XCFGSET, key); // this will manifest a -ERR, but let's use the server's message
+ }
+
public StreamAutoClaimResult StreamAutoClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None)
{
var msg = GetStreamAutoClaimMessage(key, consumerGroup, claimingConsumer, minIdleTimeInMs, startAtId, count, idsOnly: false, flags);
@@ -4627,13 +4751,14 @@ private Message GetStreamAcknowledgeAndDeleteMessage(RedisKey key, RedisValue gr
return Message.Create(Database, flags, RedisCommand.XACKDEL, key, values);
}
- private Message GetStreamAddMessage(RedisKey key, RedisValue messageId, long? maxLength, bool useApproximateMaxLength, NameValueEntry streamPair, long? limit, StreamTrimMode mode, CommandFlags flags)
+ private Message GetStreamAddMessage(in RedisKey key, RedisValue messageId, in StreamIdempotentId idempotentId, long? maxLength, bool useApproximateMaxLength, NameValueEntry streamPair, long? limit, StreamTrimMode mode, CommandFlags flags)
{
// Calculate the correct number of arguments:
// 3 array elements for Entry ID & NameValueEntry.Name & NameValueEntry.Value.
// 2 elements if using MAXLEN (keyword & value), otherwise 0.
// 1 element if using Approximate Length (~), otherwise 0.
var totalLength = 3 + (maxLength.HasValue ? 2 : 0)
+ + idempotentId.ArgCount
+ (maxLength.HasValue && useApproximateMaxLength ? 1 : 0)
+ (limit.HasValue ? 2 : 0)
+ (mode != StreamTrimMode.KeepReferences ? 1 : 0);
@@ -4664,6 +4789,8 @@ private Message GetStreamAddMessage(RedisKey key, RedisValue messageId, long? ma
values[offset++] = StreamConstants.GetMode(mode);
}
+ idempotentId.WriteTo(values, ref offset);
+
values[offset++] = messageId;
values[offset++] = streamPair.Name;
@@ -4676,7 +4803,7 @@ private Message GetStreamAddMessage(RedisKey key, RedisValue messageId, long? ma
///
/// Gets message for .
///
- private Message GetStreamAddMessage(RedisKey key, RedisValue entryId, long? maxLength, bool useApproximateMaxLength, NameValueEntry[] streamPairs, long? limit, StreamTrimMode mode, CommandFlags flags)
+ private Message GetStreamAddMessage(in RedisKey key, RedisValue entryId, in StreamIdempotentId idempotentId, long? maxLength, bool useApproximateMaxLength, NameValueEntry[] streamPairs, long? limit, StreamTrimMode mode, CommandFlags flags)
{
if (streamPairs == null) throw new ArgumentNullException(nameof(streamPairs));
if (streamPairs.Length == 0) throw new ArgumentOutOfRangeException(nameof(streamPairs), "streamPairs must contain at least one item.");
@@ -4688,6 +4815,7 @@ private Message GetStreamAddMessage(RedisKey key, RedisValue entryId, long? maxL
var totalLength = (streamPairs.Length * 2) // Room for the name/value pairs
+ 1 // The stream entry ID
+ + idempotentId.ArgCount
+ (maxLength.HasValue ? 2 : 0) // MAXLEN N
+ (maxLength.HasValue && useApproximateMaxLength ? 1 : 0) // ~
+ (mode == StreamTrimMode.KeepReferences ? 0 : 1) // relevant trim-mode keyword
@@ -4720,6 +4848,8 @@ private Message GetStreamAddMessage(RedisKey key, RedisValue entryId, long? maxL
values[offset++] = StreamConstants.GetMode(mode);
}
+ idempotentId.WriteTo(values, ref offset);
+
values[offset++] = entryId;
for (var i = 0; i < streamPairs.Length; i++)
diff --git a/src/StackExchange.Redis/RedisFeatures.cs b/src/StackExchange.Redis/RedisFeatures.cs
index d097e418c..d0ea77707 100644
--- a/src/StackExchange.Redis/RedisFeatures.cs
+++ b/src/StackExchange.Redis/RedisFeatures.cs
@@ -48,7 +48,8 @@ namespace StackExchange.Redis
v7_4_0 = new Version(7, 4, 0),
v8_0_0_M04 = new Version(7, 9, 227), // 8.0 M04 is version 7.9.227
v8_2_0_rc1 = new Version(8, 1, 240), // 8.2 RC1 is version 8.1.240
- v8_4_0_rc1 = new Version(8, 3, 224); // 8.4 RC1 is version 8.3.224
+ v8_4_0_rc1 = new Version(8, 3, 224),
+ v8_6_0 = new Version(8, 5, 999); // 8.4 RC1 is version 8.3.224
#pragma warning restore SA1310 // Field names should not contain underscore
#pragma warning restore SA1311 // Static readonly fields should begin with upper-case letter
diff --git a/src/StackExchange.Redis/RedisLiterals.cs b/src/StackExchange.Redis/RedisLiterals.cs
index 9a8c15613..dd1522d71 100644
--- a/src/StackExchange.Redis/RedisLiterals.cs
+++ b/src/StackExchange.Redis/RedisLiterals.cs
@@ -4,13 +4,14 @@ namespace StackExchange.Redis
{
#pragma warning disable SA1310 // Field names should not contain underscore
#pragma warning disable SA1311 // Static readonly fields should begin with upper-case letter
- internal static class CommonReplies
+ internal static partial class CommonReplies
{
public static readonly CommandBytes
ASK = "ASK ",
authFail_trimmed = CommandBytes.TrimToFit("ERR operation not permitted"),
backgroundSavingStarted_trimmed = CommandBytes.TrimToFit("Background saving started"),
- backgroundSavingAOFStarted_trimmed = CommandBytes.TrimToFit("Background append only file rewriting started"),
+ backgroundSavingAOFStarted_trimmed =
+ CommandBytes.TrimToFit("Background append only file rewriting started"),
databases = "databases",
loading = "LOADING ",
MOVED = "MOVED ",
@@ -30,15 +31,6 @@ public static readonly CommandBytes
yes = "yes",
zero = "0",
- // streams
- length = "length",
- radixTreeKeys = "radix-tree-keys",
- radixTreeNodes = "radix-tree-nodes",
- groups = "groups",
- lastGeneratedId = "last-generated-id",
- firstEntry = "first-entry",
- lastEntry = "last-entry",
-
// HELLO
version = "version",
proto = "proto",
@@ -46,6 +38,32 @@ public static readonly CommandBytes
mode = "mode",
id = "id";
}
+
+ internal static partial class CommonRepliesHash
+ {
+#pragma warning disable CS8981, SA1300, SA1134 // forgive naming
+ // ReSharper disable InconsistentNaming
+ [FastHash] internal static partial class length { }
+ [FastHash] internal static partial class radix_tree_keys { }
+ [FastHash] internal static partial class radix_tree_nodes { }
+ [FastHash] internal static partial class last_generated_id { }
+ [FastHash] internal static partial class max_deleted_entry_id { }
+ [FastHash] internal static partial class entries_added { }
+ [FastHash] internal static partial class recorded_first_entry_id { }
+ [FastHash] internal static partial class idmp_duration { }
+ [FastHash] internal static partial class idmp_maxsize { }
+ [FastHash] internal static partial class pids_tracked { }
+ [FastHash] internal static partial class first_entry { }
+ [FastHash] internal static partial class last_entry { }
+ [FastHash] internal static partial class groups { }
+ [FastHash] internal static partial class iids_tracked { }
+ [FastHash] internal static partial class iids_added { }
+ [FastHash] internal static partial class iids_duplicates { }
+
+ // ReSharper restore InconsistentNaming
+#pragma warning restore CS8981, SA1300, SA1134 // forgive naming
+ }
+
internal static class RedisLiterals
{
// unlike primary commands, these do not get altered by the command-map; we may as
@@ -93,6 +111,10 @@ public static readonly RedisValue
ID = "ID",
IDX = "IDX",
IDLETIME = "IDLETIME",
+ IDMP = "IDMP",
+ IDMPAUTO = "IDMPAUTO",
+ IDMP_DURATION = "IDMP-DURATION",
+ IDMP_MAXSIZE = "IDMP-MAXSIZE",
KEEPTTL = "KEEPTTL",
KILL = "KILL",
LADDR = "LADDR",
diff --git a/src/StackExchange.Redis/ResultProcessor.cs b/src/StackExchange.Redis/ResultProcessor.cs
index f2c6deb8b..926fe8950 100644
--- a/src/StackExchange.Redis/ResultProcessor.cs
+++ b/src/StackExchange.Redis/ResultProcessor.cs
@@ -2537,43 +2537,71 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
var arr = result.GetItems();
var max = arr.Length / 2;
- long length = -1, radixTreeKeys = -1, radixTreeNodes = -1, groups = -1;
- var lastGeneratedId = Redis.RedisValue.Null;
+ long length = -1, radixTreeKeys = -1, radixTreeNodes = -1, groups = -1,
+ entriesAdded = -1, idmpDuration = -1, idmpMaxsize = -1,
+ pidsTracked = -1, iidsTracked = -1, iidsAdded = -1, iidsDuplicates = -1;
+ RedisValue lastGeneratedId = Redis.RedisValue.Null,
+ maxDeletedEntryId = Redis.RedisValue.Null,
+ recordedFirstEntryId = Redis.RedisValue.Null;
StreamEntry firstEntry = StreamEntry.Null, lastEntry = StreamEntry.Null;
var iter = arr.GetEnumerator();
for (int i = 0; i < max; i++)
{
ref RawResult key = ref iter.GetNext(), value = ref iter.GetNext();
if (key.Payload.Length > CommandBytes.MaxLength) continue;
-
- var keyBytes = new CommandBytes(key.Payload);
- if (keyBytes.Equals(CommonReplies.length))
- {
- if (!value.TryGetInt64(out length)) return false;
- }
- else if (keyBytes.Equals(CommonReplies.radixTreeKeys))
- {
- if (!value.TryGetInt64(out radixTreeKeys)) return false;
- }
- else if (keyBytes.Equals(CommonReplies.radixTreeNodes))
- {
- if (!value.TryGetInt64(out radixTreeNodes)) return false;
- }
- else if (keyBytes.Equals(CommonReplies.groups))
- {
- if (!value.TryGetInt64(out groups)) return false;
- }
- else if (keyBytes.Equals(CommonReplies.lastGeneratedId))
- {
- lastGeneratedId = value.AsRedisValue();
- }
- else if (keyBytes.Equals(CommonReplies.firstEntry))
- {
- firstEntry = ParseRedisStreamEntry(value);
- }
- else if (keyBytes.Equals(CommonReplies.lastEntry))
+ var hash = key.Payload.Hash64();
+ switch (hash)
{
- lastEntry = ParseRedisStreamEntry(value);
+ case CommonRepliesHash.length.Hash when CommonRepliesHash.length.Is(hash, key):
+ if (!value.TryGetInt64(out length)) return false;
+ break;
+ case CommonRepliesHash.radix_tree_keys.Hash when CommonRepliesHash.radix_tree_keys.Is(hash, key):
+ if (!value.TryGetInt64(out radixTreeKeys)) return false;
+ break;
+ case CommonRepliesHash.radix_tree_nodes.Hash when CommonRepliesHash.radix_tree_nodes.Is(hash, key):
+ if (!value.TryGetInt64(out radixTreeNodes)) return false;
+ break;
+ case CommonRepliesHash.groups.Hash when CommonRepliesHash.groups.Is(hash, key):
+ if (!value.TryGetInt64(out groups)) return false;
+ break;
+ case CommonRepliesHash.last_generated_id.Hash when CommonRepliesHash.last_generated_id.Is(hash, key):
+ lastGeneratedId = value.AsRedisValue();
+ break;
+ case CommonRepliesHash.first_entry.Hash when CommonRepliesHash.first_entry.Is(hash, key):
+ firstEntry = ParseRedisStreamEntry(value);
+ break;
+ case CommonRepliesHash.last_entry.Hash when CommonRepliesHash.last_entry.Is(hash, key):
+ lastEntry = ParseRedisStreamEntry(value);
+ break;
+ // 7.0
+ case CommonRepliesHash.max_deleted_entry_id.Hash when CommonRepliesHash.max_deleted_entry_id.Is(hash, key):
+ maxDeletedEntryId = value.AsRedisValue();
+ break;
+ case CommonRepliesHash.recorded_first_entry_id.Hash when CommonRepliesHash.recorded_first_entry_id.Is(hash, key):
+ recordedFirstEntryId = value.AsRedisValue();
+ break;
+ case CommonRepliesHash.entries_added.Hash when CommonRepliesHash.entries_added.Is(hash, key):
+ if (!value.TryGetInt64(out entriesAdded)) return false;
+ break;
+ // 8.6
+ case CommonRepliesHash.idmp_duration.Hash when CommonRepliesHash.idmp_duration.Is(hash, key):
+ if (!value.TryGetInt64(out idmpDuration)) return false;
+ break;
+ case CommonRepliesHash.idmp_maxsize.Hash when CommonRepliesHash.idmp_maxsize.Is(hash, key):
+ if (!value.TryGetInt64(out idmpMaxsize)) return false;
+ break;
+ case CommonRepliesHash.pids_tracked.Hash when CommonRepliesHash.pids_tracked.Is(hash, key):
+ if (!value.TryGetInt64(out pidsTracked)) return false;
+ break;
+ case CommonRepliesHash.iids_tracked.Hash when CommonRepliesHash.iids_tracked.Is(hash, key):
+ if (!value.TryGetInt64(out iidsTracked)) return false;
+ break;
+ case CommonRepliesHash.iids_added.Hash when CommonRepliesHash.iids_added.Is(hash, key):
+ if (!value.TryGetInt64(out iidsAdded)) return false;
+ break;
+ case CommonRepliesHash.iids_duplicates.Hash when CommonRepliesHash.iids_duplicates.Is(hash, key):
+ if (!value.TryGetInt64(out iidsDuplicates)) return false;
+ break;
}
}
@@ -2584,7 +2612,16 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
groups: checked((int)groups),
firstEntry: firstEntry,
lastEntry: lastEntry,
- lastGeneratedId: lastGeneratedId);
+ lastGeneratedId: lastGeneratedId,
+ maxDeletedEntryId: maxDeletedEntryId,
+ entriesAdded: entriesAdded,
+ recordedFirstEntryId: recordedFirstEntryId,
+ idmpDuration: idmpDuration,
+ idmpMaxSize: idmpMaxsize,
+ pidsTracked: pidsTracked,
+ iidsTracked: iidsTracked,
+ iidsAdded: iidsAdded,
+ iidsDuplicates: iidsDuplicates);
SetResult(message, streamInfo);
return true;
diff --git a/src/StackExchange.Redis/StreamConfiguration.cs b/src/StackExchange.Redis/StreamConfiguration.cs
new file mode 100644
index 000000000..71bbe483e
--- /dev/null
+++ b/src/StackExchange.Redis/StreamConfiguration.cs
@@ -0,0 +1,20 @@
+using System.Diagnostics.CodeAnalysis;
+
+namespace StackExchange.Redis;
+
+///
+/// Configuration parameters for a stream, for example idempotent producer (IDMP) duration and maxsize.
+///
+[Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)]
+public sealed class StreamConfiguration
+{
+ ///
+ /// How long the server remembers each iid, in seconds.
+ ///
+ public long? IdmpDuration { get; set; }
+
+ ///
+ /// Maximum number of iids the server remembers per pid.
+ ///
+ public long? IdmpMaxSize { get; set; }
+}
diff --git a/src/StackExchange.Redis/StreamIdempotentId.cs b/src/StackExchange.Redis/StreamIdempotentId.cs
new file mode 100644
index 000000000..601890d1f
--- /dev/null
+++ b/src/StackExchange.Redis/StreamIdempotentId.cs
@@ -0,0 +1,82 @@
+using System;
+using System.Diagnostics.CodeAnalysis;
+
+namespace StackExchange.Redis;
+
+///
+/// The idempotent id for a stream entry, ensuring at-most-once production. Each producer should have a unique
+/// that is stable and consistent between runs. When adding stream entries, the
+/// caller can specify an that is unique and repeatable for a given data item, or omit it
+/// and let the server generate it from the content of the data item. In either event: duplicates are rejected.
+///
+[Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)]
+public readonly struct StreamIdempotentId
+{
+ // note: if exposing wider, maybe expose as a by-ref property rather than a readonly field
+ internal static readonly StreamIdempotentId Empty = default;
+
+ ///
+ /// Create a new with the given producer id.
+ ///
+ public StreamIdempotentId(RedisValue producerId)
+ {
+ if (producerId.IsNull) throw new ArgumentNullException(nameof(producerId));
+ ProducerId = producerId;
+ IdempotentId = RedisValue.Null;
+ }
+
+ ///
+ /// The idempotent id for a stream entry, ensuring at-most-once production.
+ ///
+ public StreamIdempotentId(RedisValue producerId, RedisValue idempotentId)
+ {
+ if (!producerId.HasValue) throw new ArgumentNullException(nameof(producerId));
+ ProducerId = producerId;
+ IdempotentId = idempotentId; // can be explicit null, fine
+ }
+
+ ///
+ /// The producer of the idempotent id; this is fixed for a given data generator.
+ ///
+ public RedisValue ProducerId { get; }
+
+ ///
+ /// The optional idempotent id; this should be unique for a given data item. If omitted / null,
+ /// the server will generate the idempotent id from the content of the data item.
+ ///
+ public RedisValue IdempotentId { get; }
+
+ ///
+ public override string ToString()
+ {
+ if (IdempotentId.HasValue) return $"IDMP {ProducerId} {IdempotentId}";
+ if (ProducerId.HasValue) return $"IDMPAUTO {ProducerId}";
+ return "";
+ }
+
+ internal int ArgCount => IdempotentId.HasValue ? 3 : ProducerId.HasValue ? 2 : 0;
+
+ internal void WriteTo(RedisValue[] args, ref int index)
+ {
+ if (IdempotentId.HasValue)
+ {
+ args[index++] = RedisLiterals.IDMP;
+ args[index++] = ProducerId;
+ args[index++] = IdempotentId;
+ }
+ else if (ProducerId.HasValue)
+ {
+ args[index++] = RedisLiterals.IDMPAUTO;
+ args[index++] = ProducerId;
+ }
+ }
+
+ ///
+ public override int GetHashCode() => ProducerId.GetHashCode() ^ IdempotentId.GetHashCode();
+
+ ///
+ public override bool Equals(object? obj) =>
+ obj is StreamIdempotentId other
+ && ProducerId == other.ProducerId
+ && IdempotentId == other.IdempotentId;
+}
diff --git a/tests/StackExchange.Redis.Tests/StreamTests.cs b/tests/StackExchange.Redis.Tests/StreamTests.cs
index 2419f673a..22d2a0159 100644
--- a/tests/StackExchange.Redis.Tests/StreamTests.cs
+++ b/tests/StackExchange.Redis.Tests/StreamTests.cs
@@ -81,6 +81,102 @@ public async Task StreamAddWithManualId()
Assert.Equal(id, messageId);
}
+ [Theory]
+ [InlineData(false, false, false)]
+ [InlineData(false, false, true)]
+ [InlineData(false, true, false)]
+ [InlineData(false, true, true)]
+ [InlineData(true, false, false)]
+ [InlineData(true, false, true)]
+ [InlineData(true, true, false)]
+ [InlineData(true, true, true)]
+ public async Task StreamAddIdempotentId(bool iid, bool pairs, bool async)
+ {
+ await using var conn = Create(require: RedisFeatures.v8_6_0);
+ var db = conn.GetDatabase();
+ StreamIdempotentId id = iid ? new StreamIdempotentId("pid", "iid") : new StreamIdempotentId("pid");
+ Log($"id: {id}");
+ var key = Me();
+ await db.KeyDeleteAsync(key, CommandFlags.FireAndForget);
+
+ async Task Add()
+ {
+ if (pairs)
+ {
+ NameValueEntry[] fields = [new("field1", "value1"), new("field2", "value2"), new("field3", "value3")];
+ if (async)
+ {
+ return await db.StreamAddAsync(key, fields, idempotentId: id);
+ }
+
+ return db.StreamAdd(key, fields, idempotentId: id);
+ }
+
+ if (async)
+ {
+ return await db.StreamAddAsync(key, "field1", "value1", idempotentId: id);
+ }
+
+ return db.StreamAdd(key, "field1", "value1", idempotentId: id);
+ }
+
+ RedisValue first = await Add();
+ Log($"Message ID: {first}");
+
+ RedisValue second = await Add();
+ Assert.Equal(first, second); // idempotent id has avoided a duplicate
+ }
+
+ [Theory]
+ [InlineData(null, null, false)]
+ [InlineData(null, 42, false)]
+ [InlineData(13, null, false)]
+ [InlineData(13, 42, false)]
+ [InlineData(null, null, true)]
+ [InlineData(null, 42, true)]
+ [InlineData(13, null, true)]
+ [InlineData(13, 42, true)]
+ public async Task StreamConfigure(int? duration, int? maxsize, bool async)
+ {
+ await using var conn = Create(require: RedisFeatures.v8_6_0);
+ var db = conn.GetDatabase();
+
+ var key = Me();
+ await db.KeyDeleteAsync(key, CommandFlags.FireAndForget);
+ var id = await db.StreamAddAsync(key, "field1", "value1");
+ Log($"id: {id}");
+ var settings = new StreamConfiguration { IdmpDuration = duration, IdmpMaxSize = maxsize };
+ bool doomed = duration is null && maxsize is null;
+ if (async)
+ {
+ if (doomed)
+ {
+ var ex = await Assert.ThrowsAsync(async () => await db.StreamConfigureAsync(key, settings));
+ Assert.StartsWith("ERR At least one parameter must be specified", ex.Message);
+ }
+ else
+ {
+ await db.StreamConfigureAsync(key, settings);
+ }
+ }
+ else
+ {
+ if (doomed)
+ {
+ var ex = Assert.Throws(() => db.StreamConfigure(key, settings));
+ Assert.StartsWith("ERR At least one parameter must be specified", ex.Message);
+ }
+ else
+ {
+ db.StreamConfigure(key, settings);
+ }
+ }
+ var info = async ? await db.StreamInfoAsync(key) : db.StreamInfo(key);
+ const int SERVER_DEFAULT = 100;
+ Assert.Equal(duration ?? SERVER_DEFAULT, info.IdmpDuration);
+ Assert.Equal(maxsize ?? SERVER_DEFAULT, info.IdmpMaxSize);
+ }
+
[Fact]
public async Task StreamAddMultipleValuePairsWithManualId()
{
@@ -1562,16 +1658,51 @@ public async Task StreamInfoGet()
var id1 = db.StreamAdd(key, "field1", "value1");
db.StreamAdd(key, "field2", "value2");
- db.StreamAdd(key, "field3", "value3");
- var id4 = db.StreamAdd(key, "field4", "value4");
-
+ var id3 = db.StreamAdd(key, "field3", "value3");
+ db.StreamAdd(key, "field4", "value4");
+ var id5 = db.StreamAdd(key, "field5", "value5");
+ db.StreamDelete(key, [id3]);
var streamInfo = db.StreamInfo(key);
Assert.Equal(4, streamInfo.Length);
Assert.True(streamInfo.RadixTreeKeys > 0);
Assert.True(streamInfo.RadixTreeNodes > 0);
Assert.Equal(id1, streamInfo.FirstEntry.Id);
- Assert.Equal(id4, streamInfo.LastEntry.Id);
+ Assert.Equal(id5, streamInfo.LastEntry.Id);
+
+ var server = conn.GetServer(conn.GetEndPoints().First());
+ Log($"server version: {server.Version}");
+ if (server.Version.IsAtLeast(RedisFeatures.v7_0_0_rc1))
+ {
+ Assert.Equal(id3, streamInfo.MaxDeletedEntryId);
+ Assert.Equal(5, streamInfo.EntriesAdded);
+ Assert.False(streamInfo.RecordedFirstEntryId.IsNull);
+ }
+ else
+ {
+ Assert.True(streamInfo.MaxDeletedEntryId.IsNull);
+ Assert.Equal(-1, streamInfo.EntriesAdded);
+ Assert.True(streamInfo.RecordedFirstEntryId.IsNull);
+ }
+
+ if (server.Version.IsAtLeast(RedisFeatures.v8_6_0))
+ {
+ Assert.True(streamInfo.IdmpDuration > 0);
+ Assert.True(streamInfo.IdmpMaxSize > 0);
+ Assert.Equal(0, streamInfo.PidsTracked);
+ Assert.Equal(0, streamInfo.IidsTracked);
+ Assert.Equal(0, streamInfo.IidsDuplicates);
+ Assert.Equal(0, streamInfo.IidsAdded);
+ }
+ else
+ {
+ Assert.Equal(-1, streamInfo.IdmpDuration);
+ Assert.Equal(-1, streamInfo.IdmpMaxSize);
+ Assert.Equal(-1, streamInfo.PidsTracked);
+ Assert.Equal(-1, streamInfo.IidsTracked);
+ Assert.Equal(-1, streamInfo.IidsDuplicates);
+ Assert.Equal(-1, streamInfo.IidsAdded);
+ }
}
[Fact]
@@ -2188,26 +2319,34 @@ public void StreamTrimByMinIdWithApproximateAndLimit(StreamTrimMode mode)
var db = conn.GetDatabase();
var key = Me() + ":" + mode;
- const int maxLength = 1000;
- const int limit = 100;
+ const int maxLength = 100;
+ const int limit = 10;
+ // The behavior of ACKED etc is undefined when there are no consumer groups; or rather,
+ // it *is* defined, but it is defined/implemented differently < and >= server 8.6
+ // This *does* have the side-effect that the 3 modes behave the same in this test,
+ // but: we're trying to test the API, not the server.
+ const string groupName = "test_group", consumer = "consumer";
+ db.StreamCreateConsumerGroup(key, groupName, StreamPosition.NewMessages);
for (var i = 0; i < maxLength; i++)
{
db.StreamAdd(key, $"field", $"value", 1111111110 + i);
}
+ var entries = db.StreamReadGroup(
+ key,
+ groupName,
+ consumer,
+ StreamPosition.NewMessages);
+
+ Assert.Equal(maxLength, entries.Length);
+
var numRemoved = db.StreamTrimByMinId(key, 1111111110 + maxLength, useApproximateMaxLength: true, limit: limit, mode: mode);
- var expectRemoved = mode switch
- {
- StreamTrimMode.KeepReferences => limit,
- StreamTrimMode.DeleteReferences => 0,
- StreamTrimMode.Acknowledged => 0,
- _ => throw new ArgumentOutOfRangeException(nameof(mode)),
- };
+ const int EXPECT_REMOVED = 0;
var len = db.StreamLength(key);
- Assert.Equal(expectRemoved, numRemoved);
- Assert.Equal(maxLength - expectRemoved, len);
+ Assert.Equal(EXPECT_REMOVED, numRemoved);
+ Assert.Equal(maxLength - EXPECT_REMOVED, len);
}
[Fact]