diff --git a/Directory.Build.props b/Directory.Build.props index 06542aa32..e36f0f7d1 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -10,7 +10,7 @@ true $(MSBuildThisFileDirectory)Shared.ruleset NETSDK1069 - $(NoWarn);NU5105;NU1507;SER001;SER002 + $(NoWarn);NU5105;NU1507;SER001;SER002;SER003 https://stackexchange.github.io/StackExchange.Redis/ReleaseNotes https://stackexchange.github.io/StackExchange.Redis/ MIT diff --git a/docs/ReleaseNotes.md b/docs/ReleaseNotes.md index f77a1d10a..2ccc6d2d0 100644 --- a/docs/ReleaseNotes.md +++ b/docs/ReleaseNotes.md @@ -6,6 +6,10 @@ Current package versions: | ------------ | ----------------- | ----- | | [![StackExchange.Redis](https://img.shields.io/nuget/v/StackExchange.Redis.svg)](https://www.nuget.org/packages/StackExchange.Redis/) | [![StackExchange.Redis](https://img.shields.io/nuget/vpre/StackExchange.Redis.svg)](https://www.nuget.org/packages/StackExchange.Redis/) | [![StackExchange.Redis MyGet](https://img.shields.io/myget/stackoverflow/vpre/StackExchange.Redis.svg)](https://www.myget.org/feed/stackoverflow/package/nuget/StackExchange.Redis) | +## unreleased + +- Implement idempotent stream entry (IDMP) support ([#3006 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/3006)) + ## 2.10.14 - Fix bug with connection startup failing in low-memory scenarios ([#3002 by nathan-miller23](https://github.com/StackExchange/StackExchange.Redis/pull/3002)) diff --git a/docs/Streams.md b/docs/Streams.md index 4378a528d..47e82c2b9 100644 --- a/docs/Streams.md +++ b/docs/Streams.md @@ -37,6 +37,24 @@ You also have the option to override the auto-generated message ID by passing yo db.StreamAdd("events_stream", "foo_name", "bar_value", messageId: "0-1", maxLength: 100); ``` +Idempotent write-at-most-once production +=== + +From Redis 8.6, streams support idempotent write-at-most-once production. This is achieved by passing a `StreamIdempotentId` to the `StreamAdd` method. Using idempotent ids avoids +duplicate entries in the stream, even in the event of a failure and retry. + +The `StreamIdempotentId` contains a producer id and an optional idempotent id. The producer id should be unique for a given data generator and should be stable and consistent between runs. +The optional idempotent id should be unique for a given data item. If the idempotent id is not provided, the server will generate it from the content of the data item. + +```csharp +// int someUniqueExternalSourceId = ... // optional +var idempotentId = new StreamIdempotentId("ticket_generator"); +// optionally, new StreamIdempotentId("ticket_generator", someUniqueExternalSourceId) +var messageId = db.StreamAdd("events_stream", "foo_name", "bar_value", idempotentId); +``` + +~~~~The `StreamConfigure` method can be used to configure the stream, in particular the IDMP map. The `StreamConfiguration` class has properties for the idempotent producer (IDMP) duration and max-size. + Reading from Streams === diff --git a/docs/exp/SER003.md b/docs/exp/SER003.md new file mode 100644 index 000000000..651434063 --- /dev/null +++ b/docs/exp/SER003.md @@ -0,0 +1,25 @@ +Redis 8.6 is currently in preview and may be subject to change. + +New features in Redis 8.6 include: + +- `HOTKEYS` for profiling CPU and network hot-spots by key +- `XADD IDMP[AUTP]` for idempotent (write-at-most-once) stream addition + +The corresponding library feature must also be considered subject to change: + +1. Existing bindings may cease working correctly if the underlying server API changes. +2. Changes to the server API may require changes to the library API, manifesting in either/both of build-time + or run-time breaks. + +While this seems *unlikely*, it must be considered a possibility. If you acknowledge this, you can suppress +this warning by adding the following to your `csproj` file: + +```xml +$(NoWarn);SER003 +``` + +or more granularly / locally in C#: + +``` c# +#pragma warning disable SER003 +``` diff --git a/src/StackExchange.Redis/APITypes/StreamInfo.cs b/src/StackExchange.Redis/APITypes/StreamInfo.cs index 230ea47fb..e37df5add 100644 --- a/src/StackExchange.Redis/APITypes/StreamInfo.cs +++ b/src/StackExchange.Redis/APITypes/StreamInfo.cs @@ -1,11 +1,31 @@ -namespace StackExchange.Redis; +using System.Diagnostics.CodeAnalysis; + +namespace StackExchange.Redis; /// /// Describes stream information retrieved using the XINFO STREAM command. . /// public readonly struct StreamInfo { - internal StreamInfo(int length, int radixTreeKeys, int radixTreeNodes, int groups, StreamEntry firstEntry, StreamEntry lastEntry, RedisValue lastGeneratedId) + // OK, I accept that this parameter list / size is getting silly, but: it is too late + // to refactor this as a class. + internal StreamInfo( + int length, + int radixTreeKeys, + int radixTreeNodes, + int groups, + StreamEntry firstEntry, + StreamEntry lastEntry, + RedisValue lastGeneratedId, + RedisValue maxDeletedEntryId, + long entriesAdded, + RedisValue recordedFirstEntryId, + long idmpDuration, + long idmpMaxSize, + long pidsTracked, + long iidsTracked, + long iidsAdded, + long iidsDuplicates) { Length = length; RadixTreeKeys = radixTreeKeys; @@ -14,6 +34,19 @@ internal StreamInfo(int length, int radixTreeKeys, int radixTreeNodes, int group FirstEntry = firstEntry; LastEntry = lastEntry; LastGeneratedId = lastGeneratedId; + + // 7.0 + MaxDeletedEntryId = maxDeletedEntryId; + EntriesAdded = entriesAdded; + RecordedFirstEntryId = recordedFirstEntryId; + + // 8.6 + IdmpDuration = idmpDuration; + IdmpMaxSize = idmpMaxSize; + PidsTracked = pidsTracked; + IidsTracked = iidsTracked; + IidsAdded = iidsAdded; + IidsDuplicates = iidsDuplicates; } /// @@ -50,4 +83,76 @@ internal StreamInfo(int length, int radixTreeKeys, int radixTreeNodes, int group /// The last generated id. /// public RedisValue LastGeneratedId { get; } + + /// + /// The first id recorded for the stream. + /// + public RedisValue RecordedFirstEntryId { get; } + + /// + /// The count of all entries added to the stream during its lifetime. + /// + public long EntriesAdded { get; } + + /// + /// The maximal entry ID that was deleted from the stream. + /// + public RedisValue MaxDeletedEntryId { get; } + + /// + /// The duration value configured for the stream’s IDMP map (seconds), or -1 if unavailable. + /// + public long IdmpDuration + { + [Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)] + get; + } + + /// + /// The maxsize value configured for the stream’s IDMP map, or -1 if unavailable. + /// + public long IdmpMaxSize + { + [Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)] + get; + } + + /// + /// The number of idempotent pids currently tracked in the stream, or -1 if unavailable. + /// + public long PidsTracked + { + [Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)] + get; + } + + /// + /// The number of idempotent ids currently tracked in the stream, or -1 if unavailable. + /// This count reflects active iids that haven't expired or been evicted yet. + /// + public long IidsTracked + { + [Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)] + get; + } + + /// + /// The count of all entries with an idempotent iid added to the stream during its lifetime, or -1 if unavailable. + /// This is a cumulative counter that increases with each idempotent entry added. + /// + public long IidsAdded + { + [Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)] + get; + } + + /// + /// The count of all duplicate iids (for all pids) detected during the stream's lifetime, or -1 if unavailable. + /// This is a cumulative counter that increases with each duplicate iid. + /// + public long IidsDuplicates + { + [Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)] + get; + } } diff --git a/src/StackExchange.Redis/Enums/RedisCommand.cs b/src/StackExchange.Redis/Enums/RedisCommand.cs index 14f304a35..f731a6676 100644 --- a/src/StackExchange.Redis/Enums/RedisCommand.cs +++ b/src/StackExchange.Redis/Enums/RedisCommand.cs @@ -229,6 +229,7 @@ internal enum RedisCommand XADD, XAUTOCLAIM, XCLAIM, + XCFGSET, XDEL, XDELEX, XGROUP, @@ -375,6 +376,7 @@ internal static bool IsPrimaryOnly(this RedisCommand command) case RedisCommand.VREM: case RedisCommand.VSETATTR: case RedisCommand.XAUTOCLAIM: + case RedisCommand.XCFGSET: case RedisCommand.ZADD: case RedisCommand.ZDIFFSTORE: case RedisCommand.ZINTERSTORE: diff --git a/src/StackExchange.Redis/Experiments.cs b/src/StackExchange.Redis/Experiments.cs index 441b0ec54..547838873 100644 --- a/src/StackExchange.Redis/Experiments.cs +++ b/src/StackExchange.Redis/Experiments.cs @@ -12,6 +12,8 @@ internal static class Experiments public const string VectorSets = "SER001"; // ReSharper disable once InconsistentNaming public const string Server_8_4 = "SER002"; + // ReSharper disable once InconsistentNaming + public const string Server_8_6 = "SER003"; } } diff --git a/src/StackExchange.Redis/Interfaces/IDatabase.cs b/src/StackExchange.Redis/Interfaces/IDatabase.cs index 3df162682..cf2ecafac 100644 --- a/src/StackExchange.Redis/Interfaces/IDatabase.cs +++ b/src/StackExchange.Redis/Interfaces/IDatabase.cs @@ -2662,7 +2662,27 @@ IEnumerable SortedSetScan( /// #pragma warning disable RS0026 // different shape RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None); -#pragma warning restore RS0026 + + /// + /// Adds an entry using the specified values to the given stream key. + /// If key does not exist, a new key holding a stream is created. + /// The command returns the ID of the newly created stream entry, using + /// the idempotent id (pid/iid) mechanism to ensure at-most-once production. + /// See for more information of the idempotent API. + /// + /// The key of the stream. + /// The field name for the stream entry. + /// The value to set in the stream entry. + /// The idempotent producer (pid) and optionally id (iid) to use for this entry. + /// The maximum length of the stream. + /// If true, the "~" argument is used to allow the stream to exceed max length by a small number. This improves performance when removing messages. + /// Specifies the maximal count of entries that will be evicted. + /// Determines how stream trimming should be performed. + /// The flags to use for this operation. + /// The ID of the newly created message. + /// + [Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)] + RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None); /// /// Adds an entry using the specified values to the given stream key. @@ -2679,10 +2699,38 @@ IEnumerable SortedSetScan( /// The flags to use for this operation. /// The ID of the newly created message. /// -#pragma warning disable RS0026 // different shape RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None); + + /// + /// Adds an entry using the specified values to the given stream key. + /// If key does not exist, a new key holding a stream is created. + /// The command returns the ID of the newly created stream entry, using + /// the idempotent id (pid/iid) mechanism to ensure at-most-once production. + /// See for more information of the idempotent API. + /// + /// The key of the stream. + /// The fields and their associated values to set in the stream entry. + /// The idempotent producer (pid) and optionally id (iid) to use for this entry. + /// The maximum length of the stream. + /// If true, the "~" argument is used to allow the stream to exceed max length by a small number. This improves performance when removing messages. + /// Specifies the maximal count of entries that will be evicted. + /// Determines how stream trimming should be performed. + /// The flags to use for this operation. + /// The ID of the newly created message. + /// + [Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)] + RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None); #pragma warning restore RS0026 + /// + /// Configures a stream, in particular the IDMP map. + /// + /// The key of the stream. + /// The configuration to apply. + /// The flags to use for this operation. + [Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)] + void StreamConfigure(RedisKey key, StreamConfiguration configuration, CommandFlags flags = CommandFlags.None); + /// /// Change ownership of messages consumed, but not yet acknowledged, by a different consumer. /// Messages that have been idle for more than will be claimed. diff --git a/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs b/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs index 855ea6c8f..029c7975e 100644 --- a/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs +++ b/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.ComponentModel; using System.Diagnostics.CodeAnalysis; +using System.IO; using System.Net; using System.Threading.Tasks; @@ -655,8 +656,20 @@ IAsyncEnumerable SortedSetScanAsync( /// Task StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None); + + /// + [Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)] + Task StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None); + + /// + [Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)] + Task StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None); #pragma warning restore RS0026 + /// + [Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)] + Task StreamConfigureAsync(RedisKey key, StreamConfiguration configuration, CommandFlags flags = CommandFlags.None); + /// Task StreamAutoClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None); diff --git a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs index fe23b73c1..c7831fdb8 100644 --- a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs +++ b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs @@ -621,6 +621,15 @@ public Task StreamAddAsync(RedisKey key, RedisValue streamField, Red public Task StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) => Inner.StreamAddAsync(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, limit, mode, flags); + public Task StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) => + Inner.StreamAddAsync(ToInner(key), streamField, streamValue, idempotentId, maxLength, useApproximateMaxLength, limit, mode, flags); + + public Task StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) => + Inner.StreamAddAsync(ToInner(key), streamPairs, idempotentId, maxLength, useApproximateMaxLength, limit, mode, flags); + + public Task StreamConfigureAsync(RedisKey key, StreamConfiguration configuration, CommandFlags flags = CommandFlags.None) => + Inner.StreamConfigureAsync(ToInner(key), configuration, flags); + public Task StreamAutoClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None) => Inner.StreamAutoClaimAsync(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, startAtId, count, flags); diff --git a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs index 69775c15d..01fe28505 100644 --- a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs +++ b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.IO; using System.Net; namespace StackExchange.Redis.KeyspaceIsolation @@ -603,6 +604,15 @@ public RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue str public RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) => Inner.StreamAdd(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, limit, mode, flags); + public RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) => + Inner.StreamAdd(ToInner(key), streamField, streamValue, idempotentId, maxLength, useApproximateMaxLength, limit, mode, flags); + + public RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) => + Inner.StreamAdd(ToInner(key), streamPairs, idempotentId, maxLength, useApproximateMaxLength, limit, mode, flags); + + public void StreamConfigure(RedisKey key, StreamConfiguration configuration, CommandFlags flags = CommandFlags.None) => + Inner.StreamConfigure(ToInner(key), configuration, flags); + public StreamAutoClaimResult StreamAutoClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None) => Inner.StreamAutoClaim(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, startAtId, count, flags); diff --git a/src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt b/src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt index ab058de62..3a80ab570 100644 --- a/src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt +++ b/src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt @@ -1 +1,31 @@ #nullable enable +[SER003]override StackExchange.Redis.StreamIdempotentId.Equals(object? obj) -> bool +[SER003]override StackExchange.Redis.StreamIdempotentId.GetHashCode() -> int +[SER003]override StackExchange.Redis.StreamIdempotentId.ToString() -> string! +[SER003]StackExchange.Redis.IDatabase.StreamAdd(StackExchange.Redis.RedisKey key, StackExchange.Redis.NameValueEntry[]! streamPairs, StackExchange.Redis.StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StackExchange.Redis.StreamTrimMode trimMode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisValue +[SER003]StackExchange.Redis.IDatabase.StreamAdd(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue streamField, StackExchange.Redis.RedisValue streamValue, StackExchange.Redis.StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StackExchange.Redis.StreamTrimMode trimMode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisValue +[SER003]StackExchange.Redis.IDatabase.StreamConfigure(StackExchange.Redis.RedisKey key, StackExchange.Redis.StreamConfiguration! configuration, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> void +[SER003]StackExchange.Redis.IDatabaseAsync.StreamAddAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.NameValueEntry[]! streamPairs, StackExchange.Redis.StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StackExchange.Redis.StreamTrimMode trimMode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! +[SER003]StackExchange.Redis.IDatabaseAsync.StreamAddAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue streamField, StackExchange.Redis.RedisValue streamValue, StackExchange.Redis.StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StackExchange.Redis.StreamTrimMode trimMode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! +[SER003]StackExchange.Redis.IDatabaseAsync.StreamConfigureAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.StreamConfiguration! configuration, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! +[SER003]StackExchange.Redis.StreamConfiguration +[SER003]StackExchange.Redis.StreamConfiguration.IdmpDuration.get -> long? +[SER003]StackExchange.Redis.StreamConfiguration.IdmpDuration.set -> void +[SER003]StackExchange.Redis.StreamConfiguration.IdmpMaxSize.get -> long? +[SER003]StackExchange.Redis.StreamConfiguration.IdmpMaxSize.set -> void +[SER003]StackExchange.Redis.StreamConfiguration.StreamConfiguration() -> void +[SER003]StackExchange.Redis.StreamIdempotentId +[SER003]StackExchange.Redis.StreamIdempotentId.IdempotentId.get -> StackExchange.Redis.RedisValue +[SER003]StackExchange.Redis.StreamIdempotentId.ProducerId.get -> StackExchange.Redis.RedisValue +[SER003]StackExchange.Redis.StreamIdempotentId.StreamIdempotentId() -> void +[SER003]StackExchange.Redis.StreamIdempotentId.StreamIdempotentId(StackExchange.Redis.RedisValue producerId) -> void +[SER003]StackExchange.Redis.StreamIdempotentId.StreamIdempotentId(StackExchange.Redis.RedisValue producerId, StackExchange.Redis.RedisValue idempotentId) -> void +[SER003]StackExchange.Redis.StreamInfo.IdmpDuration.get -> long +[SER003]StackExchange.Redis.StreamInfo.IdmpMaxSize.get -> long +[SER003]StackExchange.Redis.StreamInfo.IidsAdded.get -> long +[SER003]StackExchange.Redis.StreamInfo.IidsDuplicates.get -> long +[SER003]StackExchange.Redis.StreamInfo.IidsTracked.get -> long +[SER003]StackExchange.Redis.StreamInfo.PidsTracked.get -> long +StackExchange.Redis.StreamInfo.EntriesAdded.get -> long +StackExchange.Redis.StreamInfo.MaxDeletedEntryId.get -> StackExchange.Redis.RedisValue +StackExchange.Redis.StreamInfo.RecordedFirstEntryId.get -> StackExchange.Redis.RedisValue diff --git a/src/StackExchange.Redis/RedisDatabase.cs b/src/StackExchange.Redis/RedisDatabase.cs index 056a5380a..ac3c14bcc 100644 --- a/src/StackExchange.Redis/RedisDatabase.cs +++ b/src/StackExchange.Redis/RedisDatabase.cs @@ -2782,6 +2782,23 @@ public RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue str var msg = GetStreamAddMessage( key, messageId ?? StreamConstants.AutoGeneratedId, + StreamIdempotentId.Empty, + maxLength, + useApproximateMaxLength, + new NameValueEntry(streamField, streamValue), + limit, + mode, + flags); + + return ExecuteSync(msg, ResultProcessor.RedisValue); + } + + public RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) + { + var msg = GetStreamAddMessage( + key, + StreamConstants.AutoGeneratedId, + idempotentId, maxLength, useApproximateMaxLength, new NameValueEntry(streamField, streamValue), @@ -2800,6 +2817,23 @@ public Task StreamAddAsync(RedisKey key, RedisValue streamField, Red var msg = GetStreamAddMessage( key, messageId ?? StreamConstants.AutoGeneratedId, + StreamIdempotentId.Empty, + maxLength, + useApproximateMaxLength, + new NameValueEntry(streamField, streamValue), + limit, + mode, + flags); + + return ExecuteAsync(msg, ResultProcessor.RedisValue); + } + + public Task StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) + { + var msg = GetStreamAddMessage( + key, + StreamConstants.AutoGeneratedId, + idempotentId, maxLength, useApproximateMaxLength, new NameValueEntry(streamField, streamValue), @@ -2818,6 +2852,23 @@ public RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisVal var msg = GetStreamAddMessage( key, messageId ?? StreamConstants.AutoGeneratedId, + StreamIdempotentId.Empty, + maxLength, + useApproximateMaxLength, + streamPairs, + limit, + mode, + flags); + + return ExecuteSync(msg, ResultProcessor.RedisValue); + } + + public RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) + { + var msg = GetStreamAddMessage( + key, + StreamConstants.AutoGeneratedId, + idempotentId, maxLength, useApproximateMaxLength, streamPairs, @@ -2836,6 +2887,7 @@ public Task StreamAddAsync(RedisKey key, NameValueEntry[] streamPair var msg = GetStreamAddMessage( key, messageId ?? StreamConstants.AutoGeneratedId, + StreamIdempotentId.Empty, maxLength, useApproximateMaxLength, streamPairs, @@ -2846,6 +2898,78 @@ public Task StreamAddAsync(RedisKey key, NameValueEntry[] streamPair return ExecuteAsync(msg, ResultProcessor.RedisValue); } + public Task StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) + { + var msg = GetStreamAddMessage( + key, + StreamConstants.AutoGeneratedId, + idempotentId, + maxLength, + useApproximateMaxLength, + streamPairs, + limit, + mode, + flags); + + return ExecuteAsync(msg, ResultProcessor.RedisValue); + } + + public void StreamConfigure(RedisKey key, StreamConfiguration configuration, CommandFlags flags = CommandFlags.None) + { + var msg = GetStreamConfigureMessage(key, configuration, flags); + ExecuteSync(msg, ResultProcessor.DemandOK); + } + + public Task StreamConfigureAsync(RedisKey key, StreamConfiguration configuration, CommandFlags flags = CommandFlags.None) + { + var msg = GetStreamConfigureMessage(key, configuration, flags); + return ExecuteAsync(msg, ResultProcessor.DemandOK); + } + + private Message GetStreamConfigureMessage(RedisKey key, StreamConfiguration configuration, CommandFlags flags) + { + if (key.IsNull) throw new ArgumentNullException(nameof(key)); + if (configuration == null) throw new ArgumentNullException(nameof(configuration)); + if (configuration.IdmpMaxSize.HasValue) + { + if (configuration.IdmpDuration.HasValue) + { + // duration and maxsize + return Message.Create( + Database, + flags, + RedisCommand.XCFGSET, + key, + RedisLiterals.IDMP_DURATION, + configuration.IdmpDuration.Value, + RedisLiterals.IDMP_MAXSIZE, + configuration.IdmpMaxSize.Value); + } + // just maxsize + return Message.Create( + Database, + flags, + RedisCommand.XCFGSET, + key, + RedisLiterals.IDMP_MAXSIZE, + configuration.IdmpMaxSize.Value); + } + + if (configuration.IdmpDuration.HasValue) + { + // just duration + return Message.Create( + Database, + flags, + RedisCommand.XCFGSET, + key, + RedisLiterals.IDMP_DURATION, + configuration.IdmpDuration.Value); + } + + return Message.Create(Database, flags, RedisCommand.XCFGSET, key); // this will manifest a -ERR, but let's use the server's message + } + public StreamAutoClaimResult StreamAutoClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None) { var msg = GetStreamAutoClaimMessage(key, consumerGroup, claimingConsumer, minIdleTimeInMs, startAtId, count, idsOnly: false, flags); @@ -4627,13 +4751,14 @@ private Message GetStreamAcknowledgeAndDeleteMessage(RedisKey key, RedisValue gr return Message.Create(Database, flags, RedisCommand.XACKDEL, key, values); } - private Message GetStreamAddMessage(RedisKey key, RedisValue messageId, long? maxLength, bool useApproximateMaxLength, NameValueEntry streamPair, long? limit, StreamTrimMode mode, CommandFlags flags) + private Message GetStreamAddMessage(in RedisKey key, RedisValue messageId, in StreamIdempotentId idempotentId, long? maxLength, bool useApproximateMaxLength, NameValueEntry streamPair, long? limit, StreamTrimMode mode, CommandFlags flags) { // Calculate the correct number of arguments: // 3 array elements for Entry ID & NameValueEntry.Name & NameValueEntry.Value. // 2 elements if using MAXLEN (keyword & value), otherwise 0. // 1 element if using Approximate Length (~), otherwise 0. var totalLength = 3 + (maxLength.HasValue ? 2 : 0) + + idempotentId.ArgCount + (maxLength.HasValue && useApproximateMaxLength ? 1 : 0) + (limit.HasValue ? 2 : 0) + (mode != StreamTrimMode.KeepReferences ? 1 : 0); @@ -4664,6 +4789,8 @@ private Message GetStreamAddMessage(RedisKey key, RedisValue messageId, long? ma values[offset++] = StreamConstants.GetMode(mode); } + idempotentId.WriteTo(values, ref offset); + values[offset++] = messageId; values[offset++] = streamPair.Name; @@ -4676,7 +4803,7 @@ private Message GetStreamAddMessage(RedisKey key, RedisValue messageId, long? ma /// /// Gets message for . /// - private Message GetStreamAddMessage(RedisKey key, RedisValue entryId, long? maxLength, bool useApproximateMaxLength, NameValueEntry[] streamPairs, long? limit, StreamTrimMode mode, CommandFlags flags) + private Message GetStreamAddMessage(in RedisKey key, RedisValue entryId, in StreamIdempotentId idempotentId, long? maxLength, bool useApproximateMaxLength, NameValueEntry[] streamPairs, long? limit, StreamTrimMode mode, CommandFlags flags) { if (streamPairs == null) throw new ArgumentNullException(nameof(streamPairs)); if (streamPairs.Length == 0) throw new ArgumentOutOfRangeException(nameof(streamPairs), "streamPairs must contain at least one item."); @@ -4688,6 +4815,7 @@ private Message GetStreamAddMessage(RedisKey key, RedisValue entryId, long? maxL var totalLength = (streamPairs.Length * 2) // Room for the name/value pairs + 1 // The stream entry ID + + idempotentId.ArgCount + (maxLength.HasValue ? 2 : 0) // MAXLEN N + (maxLength.HasValue && useApproximateMaxLength ? 1 : 0) // ~ + (mode == StreamTrimMode.KeepReferences ? 0 : 1) // relevant trim-mode keyword @@ -4720,6 +4848,8 @@ private Message GetStreamAddMessage(RedisKey key, RedisValue entryId, long? maxL values[offset++] = StreamConstants.GetMode(mode); } + idempotentId.WriteTo(values, ref offset); + values[offset++] = entryId; for (var i = 0; i < streamPairs.Length; i++) diff --git a/src/StackExchange.Redis/RedisFeatures.cs b/src/StackExchange.Redis/RedisFeatures.cs index d097e418c..d0ea77707 100644 --- a/src/StackExchange.Redis/RedisFeatures.cs +++ b/src/StackExchange.Redis/RedisFeatures.cs @@ -48,7 +48,8 @@ namespace StackExchange.Redis v7_4_0 = new Version(7, 4, 0), v8_0_0_M04 = new Version(7, 9, 227), // 8.0 M04 is version 7.9.227 v8_2_0_rc1 = new Version(8, 1, 240), // 8.2 RC1 is version 8.1.240 - v8_4_0_rc1 = new Version(8, 3, 224); // 8.4 RC1 is version 8.3.224 + v8_4_0_rc1 = new Version(8, 3, 224), + v8_6_0 = new Version(8, 5, 999); // 8.4 RC1 is version 8.3.224 #pragma warning restore SA1310 // Field names should not contain underscore #pragma warning restore SA1311 // Static readonly fields should begin with upper-case letter diff --git a/src/StackExchange.Redis/RedisLiterals.cs b/src/StackExchange.Redis/RedisLiterals.cs index 9a8c15613..dd1522d71 100644 --- a/src/StackExchange.Redis/RedisLiterals.cs +++ b/src/StackExchange.Redis/RedisLiterals.cs @@ -4,13 +4,14 @@ namespace StackExchange.Redis { #pragma warning disable SA1310 // Field names should not contain underscore #pragma warning disable SA1311 // Static readonly fields should begin with upper-case letter - internal static class CommonReplies + internal static partial class CommonReplies { public static readonly CommandBytes ASK = "ASK ", authFail_trimmed = CommandBytes.TrimToFit("ERR operation not permitted"), backgroundSavingStarted_trimmed = CommandBytes.TrimToFit("Background saving started"), - backgroundSavingAOFStarted_trimmed = CommandBytes.TrimToFit("Background append only file rewriting started"), + backgroundSavingAOFStarted_trimmed = + CommandBytes.TrimToFit("Background append only file rewriting started"), databases = "databases", loading = "LOADING ", MOVED = "MOVED ", @@ -30,15 +31,6 @@ public static readonly CommandBytes yes = "yes", zero = "0", - // streams - length = "length", - radixTreeKeys = "radix-tree-keys", - radixTreeNodes = "radix-tree-nodes", - groups = "groups", - lastGeneratedId = "last-generated-id", - firstEntry = "first-entry", - lastEntry = "last-entry", - // HELLO version = "version", proto = "proto", @@ -46,6 +38,32 @@ public static readonly CommandBytes mode = "mode", id = "id"; } + + internal static partial class CommonRepliesHash + { +#pragma warning disable CS8981, SA1300, SA1134 // forgive naming + // ReSharper disable InconsistentNaming + [FastHash] internal static partial class length { } + [FastHash] internal static partial class radix_tree_keys { } + [FastHash] internal static partial class radix_tree_nodes { } + [FastHash] internal static partial class last_generated_id { } + [FastHash] internal static partial class max_deleted_entry_id { } + [FastHash] internal static partial class entries_added { } + [FastHash] internal static partial class recorded_first_entry_id { } + [FastHash] internal static partial class idmp_duration { } + [FastHash] internal static partial class idmp_maxsize { } + [FastHash] internal static partial class pids_tracked { } + [FastHash] internal static partial class first_entry { } + [FastHash] internal static partial class last_entry { } + [FastHash] internal static partial class groups { } + [FastHash] internal static partial class iids_tracked { } + [FastHash] internal static partial class iids_added { } + [FastHash] internal static partial class iids_duplicates { } + + // ReSharper restore InconsistentNaming +#pragma warning restore CS8981, SA1300, SA1134 // forgive naming + } + internal static class RedisLiterals { // unlike primary commands, these do not get altered by the command-map; we may as @@ -93,6 +111,10 @@ public static readonly RedisValue ID = "ID", IDX = "IDX", IDLETIME = "IDLETIME", + IDMP = "IDMP", + IDMPAUTO = "IDMPAUTO", + IDMP_DURATION = "IDMP-DURATION", + IDMP_MAXSIZE = "IDMP-MAXSIZE", KEEPTTL = "KEEPTTL", KILL = "KILL", LADDR = "LADDR", diff --git a/src/StackExchange.Redis/ResultProcessor.cs b/src/StackExchange.Redis/ResultProcessor.cs index f2c6deb8b..926fe8950 100644 --- a/src/StackExchange.Redis/ResultProcessor.cs +++ b/src/StackExchange.Redis/ResultProcessor.cs @@ -2537,43 +2537,71 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes var arr = result.GetItems(); var max = arr.Length / 2; - long length = -1, radixTreeKeys = -1, radixTreeNodes = -1, groups = -1; - var lastGeneratedId = Redis.RedisValue.Null; + long length = -1, radixTreeKeys = -1, radixTreeNodes = -1, groups = -1, + entriesAdded = -1, idmpDuration = -1, idmpMaxsize = -1, + pidsTracked = -1, iidsTracked = -1, iidsAdded = -1, iidsDuplicates = -1; + RedisValue lastGeneratedId = Redis.RedisValue.Null, + maxDeletedEntryId = Redis.RedisValue.Null, + recordedFirstEntryId = Redis.RedisValue.Null; StreamEntry firstEntry = StreamEntry.Null, lastEntry = StreamEntry.Null; var iter = arr.GetEnumerator(); for (int i = 0; i < max; i++) { ref RawResult key = ref iter.GetNext(), value = ref iter.GetNext(); if (key.Payload.Length > CommandBytes.MaxLength) continue; - - var keyBytes = new CommandBytes(key.Payload); - if (keyBytes.Equals(CommonReplies.length)) - { - if (!value.TryGetInt64(out length)) return false; - } - else if (keyBytes.Equals(CommonReplies.radixTreeKeys)) - { - if (!value.TryGetInt64(out radixTreeKeys)) return false; - } - else if (keyBytes.Equals(CommonReplies.radixTreeNodes)) - { - if (!value.TryGetInt64(out radixTreeNodes)) return false; - } - else if (keyBytes.Equals(CommonReplies.groups)) - { - if (!value.TryGetInt64(out groups)) return false; - } - else if (keyBytes.Equals(CommonReplies.lastGeneratedId)) - { - lastGeneratedId = value.AsRedisValue(); - } - else if (keyBytes.Equals(CommonReplies.firstEntry)) - { - firstEntry = ParseRedisStreamEntry(value); - } - else if (keyBytes.Equals(CommonReplies.lastEntry)) + var hash = key.Payload.Hash64(); + switch (hash) { - lastEntry = ParseRedisStreamEntry(value); + case CommonRepliesHash.length.Hash when CommonRepliesHash.length.Is(hash, key): + if (!value.TryGetInt64(out length)) return false; + break; + case CommonRepliesHash.radix_tree_keys.Hash when CommonRepliesHash.radix_tree_keys.Is(hash, key): + if (!value.TryGetInt64(out radixTreeKeys)) return false; + break; + case CommonRepliesHash.radix_tree_nodes.Hash when CommonRepliesHash.radix_tree_nodes.Is(hash, key): + if (!value.TryGetInt64(out radixTreeNodes)) return false; + break; + case CommonRepliesHash.groups.Hash when CommonRepliesHash.groups.Is(hash, key): + if (!value.TryGetInt64(out groups)) return false; + break; + case CommonRepliesHash.last_generated_id.Hash when CommonRepliesHash.last_generated_id.Is(hash, key): + lastGeneratedId = value.AsRedisValue(); + break; + case CommonRepliesHash.first_entry.Hash when CommonRepliesHash.first_entry.Is(hash, key): + firstEntry = ParseRedisStreamEntry(value); + break; + case CommonRepliesHash.last_entry.Hash when CommonRepliesHash.last_entry.Is(hash, key): + lastEntry = ParseRedisStreamEntry(value); + break; + // 7.0 + case CommonRepliesHash.max_deleted_entry_id.Hash when CommonRepliesHash.max_deleted_entry_id.Is(hash, key): + maxDeletedEntryId = value.AsRedisValue(); + break; + case CommonRepliesHash.recorded_first_entry_id.Hash when CommonRepliesHash.recorded_first_entry_id.Is(hash, key): + recordedFirstEntryId = value.AsRedisValue(); + break; + case CommonRepliesHash.entries_added.Hash when CommonRepliesHash.entries_added.Is(hash, key): + if (!value.TryGetInt64(out entriesAdded)) return false; + break; + // 8.6 + case CommonRepliesHash.idmp_duration.Hash when CommonRepliesHash.idmp_duration.Is(hash, key): + if (!value.TryGetInt64(out idmpDuration)) return false; + break; + case CommonRepliesHash.idmp_maxsize.Hash when CommonRepliesHash.idmp_maxsize.Is(hash, key): + if (!value.TryGetInt64(out idmpMaxsize)) return false; + break; + case CommonRepliesHash.pids_tracked.Hash when CommonRepliesHash.pids_tracked.Is(hash, key): + if (!value.TryGetInt64(out pidsTracked)) return false; + break; + case CommonRepliesHash.iids_tracked.Hash when CommonRepliesHash.iids_tracked.Is(hash, key): + if (!value.TryGetInt64(out iidsTracked)) return false; + break; + case CommonRepliesHash.iids_added.Hash when CommonRepliesHash.iids_added.Is(hash, key): + if (!value.TryGetInt64(out iidsAdded)) return false; + break; + case CommonRepliesHash.iids_duplicates.Hash when CommonRepliesHash.iids_duplicates.Is(hash, key): + if (!value.TryGetInt64(out iidsDuplicates)) return false; + break; } } @@ -2584,7 +2612,16 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes groups: checked((int)groups), firstEntry: firstEntry, lastEntry: lastEntry, - lastGeneratedId: lastGeneratedId); + lastGeneratedId: lastGeneratedId, + maxDeletedEntryId: maxDeletedEntryId, + entriesAdded: entriesAdded, + recordedFirstEntryId: recordedFirstEntryId, + idmpDuration: idmpDuration, + idmpMaxSize: idmpMaxsize, + pidsTracked: pidsTracked, + iidsTracked: iidsTracked, + iidsAdded: iidsAdded, + iidsDuplicates: iidsDuplicates); SetResult(message, streamInfo); return true; diff --git a/src/StackExchange.Redis/StreamConfiguration.cs b/src/StackExchange.Redis/StreamConfiguration.cs new file mode 100644 index 000000000..71bbe483e --- /dev/null +++ b/src/StackExchange.Redis/StreamConfiguration.cs @@ -0,0 +1,20 @@ +using System.Diagnostics.CodeAnalysis; + +namespace StackExchange.Redis; + +/// +/// Configuration parameters for a stream, for example idempotent producer (IDMP) duration and maxsize. +/// +[Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)] +public sealed class StreamConfiguration +{ + /// + /// How long the server remembers each iid, in seconds. + /// + public long? IdmpDuration { get; set; } + + /// + /// Maximum number of iids the server remembers per pid. + /// + public long? IdmpMaxSize { get; set; } +} diff --git a/src/StackExchange.Redis/StreamIdempotentId.cs b/src/StackExchange.Redis/StreamIdempotentId.cs new file mode 100644 index 000000000..601890d1f --- /dev/null +++ b/src/StackExchange.Redis/StreamIdempotentId.cs @@ -0,0 +1,82 @@ +using System; +using System.Diagnostics.CodeAnalysis; + +namespace StackExchange.Redis; + +/// +/// The idempotent id for a stream entry, ensuring at-most-once production. Each producer should have a unique +/// that is stable and consistent between runs. When adding stream entries, the +/// caller can specify an that is unique and repeatable for a given data item, or omit it +/// and let the server generate it from the content of the data item. In either event: duplicates are rejected. +/// +[Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)] +public readonly struct StreamIdempotentId +{ + // note: if exposing wider, maybe expose as a by-ref property rather than a readonly field + internal static readonly StreamIdempotentId Empty = default; + + /// + /// Create a new with the given producer id. + /// + public StreamIdempotentId(RedisValue producerId) + { + if (producerId.IsNull) throw new ArgumentNullException(nameof(producerId)); + ProducerId = producerId; + IdempotentId = RedisValue.Null; + } + + /// + /// The idempotent id for a stream entry, ensuring at-most-once production. + /// + public StreamIdempotentId(RedisValue producerId, RedisValue idempotentId) + { + if (!producerId.HasValue) throw new ArgumentNullException(nameof(producerId)); + ProducerId = producerId; + IdempotentId = idempotentId; // can be explicit null, fine + } + + /// + /// The producer of the idempotent id; this is fixed for a given data generator. + /// + public RedisValue ProducerId { get; } + + /// + /// The optional idempotent id; this should be unique for a given data item. If omitted / null, + /// the server will generate the idempotent id from the content of the data item. + /// + public RedisValue IdempotentId { get; } + + /// + public override string ToString() + { + if (IdempotentId.HasValue) return $"IDMP {ProducerId} {IdempotentId}"; + if (ProducerId.HasValue) return $"IDMPAUTO {ProducerId}"; + return ""; + } + + internal int ArgCount => IdempotentId.HasValue ? 3 : ProducerId.HasValue ? 2 : 0; + + internal void WriteTo(RedisValue[] args, ref int index) + { + if (IdempotentId.HasValue) + { + args[index++] = RedisLiterals.IDMP; + args[index++] = ProducerId; + args[index++] = IdempotentId; + } + else if (ProducerId.HasValue) + { + args[index++] = RedisLiterals.IDMPAUTO; + args[index++] = ProducerId; + } + } + + /// + public override int GetHashCode() => ProducerId.GetHashCode() ^ IdempotentId.GetHashCode(); + + /// + public override bool Equals(object? obj) => + obj is StreamIdempotentId other + && ProducerId == other.ProducerId + && IdempotentId == other.IdempotentId; +} diff --git a/tests/StackExchange.Redis.Tests/StreamTests.cs b/tests/StackExchange.Redis.Tests/StreamTests.cs index 2419f673a..22d2a0159 100644 --- a/tests/StackExchange.Redis.Tests/StreamTests.cs +++ b/tests/StackExchange.Redis.Tests/StreamTests.cs @@ -81,6 +81,102 @@ public async Task StreamAddWithManualId() Assert.Equal(id, messageId); } + [Theory] + [InlineData(false, false, false)] + [InlineData(false, false, true)] + [InlineData(false, true, false)] + [InlineData(false, true, true)] + [InlineData(true, false, false)] + [InlineData(true, false, true)] + [InlineData(true, true, false)] + [InlineData(true, true, true)] + public async Task StreamAddIdempotentId(bool iid, bool pairs, bool async) + { + await using var conn = Create(require: RedisFeatures.v8_6_0); + var db = conn.GetDatabase(); + StreamIdempotentId id = iid ? new StreamIdempotentId("pid", "iid") : new StreamIdempotentId("pid"); + Log($"id: {id}"); + var key = Me(); + await db.KeyDeleteAsync(key, CommandFlags.FireAndForget); + + async Task Add() + { + if (pairs) + { + NameValueEntry[] fields = [new("field1", "value1"), new("field2", "value2"), new("field3", "value3")]; + if (async) + { + return await db.StreamAddAsync(key, fields, idempotentId: id); + } + + return db.StreamAdd(key, fields, idempotentId: id); + } + + if (async) + { + return await db.StreamAddAsync(key, "field1", "value1", idempotentId: id); + } + + return db.StreamAdd(key, "field1", "value1", idempotentId: id); + } + + RedisValue first = await Add(); + Log($"Message ID: {first}"); + + RedisValue second = await Add(); + Assert.Equal(first, second); // idempotent id has avoided a duplicate + } + + [Theory] + [InlineData(null, null, false)] + [InlineData(null, 42, false)] + [InlineData(13, null, false)] + [InlineData(13, 42, false)] + [InlineData(null, null, true)] + [InlineData(null, 42, true)] + [InlineData(13, null, true)] + [InlineData(13, 42, true)] + public async Task StreamConfigure(int? duration, int? maxsize, bool async) + { + await using var conn = Create(require: RedisFeatures.v8_6_0); + var db = conn.GetDatabase(); + + var key = Me(); + await db.KeyDeleteAsync(key, CommandFlags.FireAndForget); + var id = await db.StreamAddAsync(key, "field1", "value1"); + Log($"id: {id}"); + var settings = new StreamConfiguration { IdmpDuration = duration, IdmpMaxSize = maxsize }; + bool doomed = duration is null && maxsize is null; + if (async) + { + if (doomed) + { + var ex = await Assert.ThrowsAsync(async () => await db.StreamConfigureAsync(key, settings)); + Assert.StartsWith("ERR At least one parameter must be specified", ex.Message); + } + else + { + await db.StreamConfigureAsync(key, settings); + } + } + else + { + if (doomed) + { + var ex = Assert.Throws(() => db.StreamConfigure(key, settings)); + Assert.StartsWith("ERR At least one parameter must be specified", ex.Message); + } + else + { + db.StreamConfigure(key, settings); + } + } + var info = async ? await db.StreamInfoAsync(key) : db.StreamInfo(key); + const int SERVER_DEFAULT = 100; + Assert.Equal(duration ?? SERVER_DEFAULT, info.IdmpDuration); + Assert.Equal(maxsize ?? SERVER_DEFAULT, info.IdmpMaxSize); + } + [Fact] public async Task StreamAddMultipleValuePairsWithManualId() { @@ -1562,16 +1658,51 @@ public async Task StreamInfoGet() var id1 = db.StreamAdd(key, "field1", "value1"); db.StreamAdd(key, "field2", "value2"); - db.StreamAdd(key, "field3", "value3"); - var id4 = db.StreamAdd(key, "field4", "value4"); - + var id3 = db.StreamAdd(key, "field3", "value3"); + db.StreamAdd(key, "field4", "value4"); + var id5 = db.StreamAdd(key, "field5", "value5"); + db.StreamDelete(key, [id3]); var streamInfo = db.StreamInfo(key); Assert.Equal(4, streamInfo.Length); Assert.True(streamInfo.RadixTreeKeys > 0); Assert.True(streamInfo.RadixTreeNodes > 0); Assert.Equal(id1, streamInfo.FirstEntry.Id); - Assert.Equal(id4, streamInfo.LastEntry.Id); + Assert.Equal(id5, streamInfo.LastEntry.Id); + + var server = conn.GetServer(conn.GetEndPoints().First()); + Log($"server version: {server.Version}"); + if (server.Version.IsAtLeast(RedisFeatures.v7_0_0_rc1)) + { + Assert.Equal(id3, streamInfo.MaxDeletedEntryId); + Assert.Equal(5, streamInfo.EntriesAdded); + Assert.False(streamInfo.RecordedFirstEntryId.IsNull); + } + else + { + Assert.True(streamInfo.MaxDeletedEntryId.IsNull); + Assert.Equal(-1, streamInfo.EntriesAdded); + Assert.True(streamInfo.RecordedFirstEntryId.IsNull); + } + + if (server.Version.IsAtLeast(RedisFeatures.v8_6_0)) + { + Assert.True(streamInfo.IdmpDuration > 0); + Assert.True(streamInfo.IdmpMaxSize > 0); + Assert.Equal(0, streamInfo.PidsTracked); + Assert.Equal(0, streamInfo.IidsTracked); + Assert.Equal(0, streamInfo.IidsDuplicates); + Assert.Equal(0, streamInfo.IidsAdded); + } + else + { + Assert.Equal(-1, streamInfo.IdmpDuration); + Assert.Equal(-1, streamInfo.IdmpMaxSize); + Assert.Equal(-1, streamInfo.PidsTracked); + Assert.Equal(-1, streamInfo.IidsTracked); + Assert.Equal(-1, streamInfo.IidsDuplicates); + Assert.Equal(-1, streamInfo.IidsAdded); + } } [Fact] @@ -2188,26 +2319,34 @@ public void StreamTrimByMinIdWithApproximateAndLimit(StreamTrimMode mode) var db = conn.GetDatabase(); var key = Me() + ":" + mode; - const int maxLength = 1000; - const int limit = 100; + const int maxLength = 100; + const int limit = 10; + // The behavior of ACKED etc is undefined when there are no consumer groups; or rather, + // it *is* defined, but it is defined/implemented differently < and >= server 8.6 + // This *does* have the side-effect that the 3 modes behave the same in this test, + // but: we're trying to test the API, not the server. + const string groupName = "test_group", consumer = "consumer"; + db.StreamCreateConsumerGroup(key, groupName, StreamPosition.NewMessages); for (var i = 0; i < maxLength; i++) { db.StreamAdd(key, $"field", $"value", 1111111110 + i); } + var entries = db.StreamReadGroup( + key, + groupName, + consumer, + StreamPosition.NewMessages); + + Assert.Equal(maxLength, entries.Length); + var numRemoved = db.StreamTrimByMinId(key, 1111111110 + maxLength, useApproximateMaxLength: true, limit: limit, mode: mode); - var expectRemoved = mode switch - { - StreamTrimMode.KeepReferences => limit, - StreamTrimMode.DeleteReferences => 0, - StreamTrimMode.Acknowledged => 0, - _ => throw new ArgumentOutOfRangeException(nameof(mode)), - }; + const int EXPECT_REMOVED = 0; var len = db.StreamLength(key); - Assert.Equal(expectRemoved, numRemoved); - Assert.Equal(maxLength - expectRemoved, len); + Assert.Equal(EXPECT_REMOVED, numRemoved); + Assert.Equal(maxLength - EXPECT_REMOVED, len); } [Fact]