Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,4 @@ src/scaffolding.config
*.sln.iml

# Visual Studio Code
.vscode
.vscode
3 changes: 3 additions & 0 deletions src/Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
<PackageVersion Include="Autofac" Version="9.0.0" />
<PackageVersion Include="AWSSDK.CloudWatch" Version="4.0.7" />
<PackageVersion Include="Azure.Identity" Version="1.17.1" />
<PackageVersion Include="Azure.Storage.Blobs" Version="12.27.0" />
<PackageVersion Include="Azure.Monitor.Query.Metrics" Version="1.0.0" />
<PackageVersion Include="Azure.ResourceManager.ServiceBus" Version="1.1.0" />
<PackageVersion Include="ByteSize" Version="2.1.2" />
Expand All @@ -19,6 +20,7 @@
<PackageVersion Include="Microsoft.AspNetCore.Authentication.JwtBearer" Version="10.0.3" />
<PackageVersion Include="Microsoft.AspNetCore.Authentication.OpenIdConnect" Version="10.0.3" />
<PackageVersion Include="Microsoft.AspNetCore.Mvc.Testing" Version="10.0.3" />
<PackageVersion Include="MongoDB.Driver" Version="3.6.0" />
<PackageVersion Include="Microsoft.AspNetCore.SignalR.Client" Version="10.0.3" />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection" Version="10.0.3" />
<PackageVersion Include="Microsoft.Extensions.DependencyModel" Version="10.0.3" />
Expand Down Expand Up @@ -80,6 +82,7 @@
<PackageVersion Include="System.Reactive" Version="6.1.0" />
<PackageVersion Include="System.Reflection.MetadataLoadContext" Version="10.0.3" />
<PackageVersion Include="System.ServiceProcess.ServiceController" Version="10.0.3" />
<PackageVersion Include="Testcontainers.MongoDb" Version="4.3.0" />
<PackageVersion Include="Validar.Fody" Version="1.9.0" />
<PackageVersion Include="Yarp.ReverseProxy" Version="2.3.0" />
</ItemGroup>
Expand Down
1 change: 1 addition & 0 deletions src/ProjectReferences.Persisters.Audit.props
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
<ItemGroup Label="Persisters">
<ProjectReference Include="..\ServiceControl.Audit.Persistence.InMemory\ServiceControl.Audit.Persistence.InMemory.csproj" ReferenceOutputAssembly="false" Private="false" />
<ProjectReference Include="..\ServiceControl.Audit.Persistence.RavenDB\ServiceControl.Audit.Persistence.RavenDB.csproj" ReferenceOutputAssembly="false" Private="false" />
<ProjectReference Include="..\ServiceControl.Audit.Persistence.MongoDB\ServiceControl.Audit.Persistence.MongoDB.csproj" ReferenceOutputAssembly="false" Private="false" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
namespace ServiceControl.Audit.Persistence.MongoDB.BodyStorage
{
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Auditing.BodyStorage;
using Azure;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Microsoft.Extensions.Logging;

/// <summary>
/// Body storage implementation that uses Azure Blob Storage to store message bodies. Each body is stored as a separate blob, with metadata for content type and size.
/// The implementation includes retry logic for transient failures when uploading blobs, and uses a batched writer to optimize performance when storing large volumes of messages.
/// </summary>
class AzureBlobBodyStorage(
Channel<BodyWriteItem> channel,
MongoSettings settings,
ILogger<AzureBlobBodyStorage> logger)
: BatchedBodyStorageWriter<BodyWriteItem>(channel, settings, logger), IBodyStorage, IBodyWriter
{
const int MaxRetries = 3;
readonly BlobContainerClient containerClient = new(settings.BlobConnectionString, settings.BlobContainerName);

protected override string WriterName => "Azure Blob body storage writer";

// Initialization

public async Task Initialize(CancellationToken cancellationToken)
{
_ = await containerClient.CreateIfNotExistsAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
logger.LogInformation("Azure Blob body storage initialized. Container: {ContainerName}", containerClient.Name);
}

// IBodyWriter

public bool IsEnabled => true;

public async ValueTask WriteAsync(string id, string contentType, ReadOnlyMemory<byte> body, DateTime expiresAt, CancellationToken cancellationToken)
{
await WriteToChannelAsync(new BodyWriteItem
{
Id = id,
ContentType = contentType,
BodySize = body.Length,
Body = body.ToArray(),
ExpiresAt = expiresAt
}, cancellationToken).ConfigureAwait(false);
}

// IBodyStorage

public Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream, CancellationToken cancellationToken)
=> Task.CompletedTask;

public async Task<StreamResult> TryFetch(string bodyId, CancellationToken cancellationToken)
{
var blobClient = containerClient.GetBlobClient(bodyId);

try
{
var response = await blobClient.DownloadStreamingAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
var details = response.Value.Details;

var bodySize = 0;
if (details.Metadata.TryGetValue("bodySize", out var bodySizeStr))
{
_ = int.TryParse(bodySizeStr, out bodySize);
}

return new StreamResult
{
HasResult = true,
Stream = response.Value.Content,
ContentType = details.ContentType ?? "text/plain",
BodySize = bodySize,
Etag = details.ETag.ToString()
};
}
catch (RequestFailedException ex) when (ex.Status == 404)
{
return new StreamResult { HasResult = false };
}
}

// BatchedBodyStorageWriter

protected override async Task FlushBatchAsync(List<BodyWriteItem> batch, CancellationToken cancellationToken)
{
var uploadTasks = batch.Select(entry => UploadBlobWithRetry(entry, cancellationToken));
await Task.WhenAll(uploadTasks).ConfigureAwait(false);
}

async Task UploadBlobWithRetry(BodyWriteItem entry, CancellationToken cancellationToken)
{
var blobClient = containerClient.GetBlobClient(entry.Id);

for (var attempt = 1; attempt <= MaxRetries; attempt++)
{
try
{
using var stream = new MemoryStream(entry.Body);
var options = new BlobUploadOptions
{
HttpHeaders = new BlobHttpHeaders { ContentType = entry.ContentType.Trim() },
Metadata = new Dictionary<string, string>
{
["messageId"] = entry.Id.Trim(),
["bodySize"] = entry.BodySize.ToString(),
["mongoExpiresAt"] = entry.ExpiresAt.ToString("O")
}
};
_ = await blobClient.UploadAsync(stream, options, cancellationToken).ConfigureAwait(false);
return;
}
catch (Exception ex) when (attempt < MaxRetries && !cancellationToken.IsCancellationRequested)
{
var delay = TimeSpan.FromSeconds(Math.Pow(2, attempt - 1));
logger.LogWarning(ex, "Failed to upload blob {BlobId} (attempt {Attempt}/{MaxRetries}), retrying in {Delay}s",
entry.Id, attempt, MaxRetries, delay.TotalSeconds);
await Task.Delay(delay, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to upload blob {BlobId} after {MaxRetries} attempts", entry.Id, MaxRetries);
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
namespace ServiceControl.Audit.Persistence.MongoDB.BodyStorage
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

/// <summary>
/// Base class for body storage writers that batch write operations for improved performance.
/// The batcher assembles batches of entries from the input channel, and processes them in parallel using a configurable number of writer tasks.
/// </summary>
abstract class BatchedBodyStorageWriter<TEntry>(
Channel<TEntry> channel,
MongoSettings settings,
ILogger logger)
: BackgroundService
{
readonly int BatchSize = settings.BodyWriterBatchSize;
readonly int ParallelWriters = settings.BodyWriterParallelWriters;
readonly TimeSpan BatchTimeout = settings.BodyWriterBatchTimeout;
const int BacklogWarningThreshold = 5_000;
long totalWritten;
DateTime lastBacklogWarning;
DateTime lastBackpressureWarning;

readonly Channel<List<TEntry>> batchChannel = Channel.CreateBounded<List<TEntry>>(
new BoundedChannelOptions(settings.BodyWriterParallelWriters * 2)
{
SingleReader = false,
SingleWriter = true,
AllowSynchronousContinuations = false,
FullMode = BoundedChannelFullMode.Wait
});

protected ChannelWriter<TEntry> WriteChannel => channel.Writer;

protected async ValueTask WriteToChannelAsync(TEntry entry, CancellationToken cancellationToken)
{
if (channel.Writer.TryWrite(entry))
{
return;
}

if (DateTime.UtcNow - lastBackpressureWarning > TimeSpan.FromSeconds(10))
{
lastBackpressureWarning = DateTime.UtcNow;
logger.LogWarning("{WriterName} channel is full (backlog: {Backlog}). Body writes are blocking ingestion until the writer catches up",
WriterName, channel.Reader.Count);
}

await channel.Writer.WriteAsync(entry, cancellationToken).ConfigureAwait(false);
}

protected abstract string WriterName { get; }

protected abstract Task FlushBatchAsync(List<TEntry> batch, CancellationToken cancellationToken);

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
logger.LogInformation("{WriterName} started ({Writers} writers, batch size {BatchSize})", WriterName, ParallelWriters, BatchSize);

var assemblerTask = Task.Run(() => BatchAssemblerLoop(stoppingToken), CancellationToken.None);

var writerTasks = new Task[ParallelWriters];
for (var i = 0; i < ParallelWriters; i++)
{
var writerId = i;
writerTasks[i] = Task.Run(() => WriterLoop(writerId, stoppingToken), CancellationToken.None);
}

try
{
await Task.WhenAll(writerTasks.Append(assemblerTask)).ConfigureAwait(false);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
// Expected during shutdown
}

logger.LogInformation("{WriterName} stopped", WriterName);
}

async Task BatchAssemblerLoop(CancellationToken stoppingToken)
{
var batch = new List<TEntry>(BatchSize);

try
{
while (await channel.Reader.WaitToReadAsync(stoppingToken).ConfigureAwait(false))
{
while (batch.Count < BatchSize && channel.Reader.TryRead(out var entry))
{
batch.Add(entry);
}

if (batch.Count > 0 && batch.Count < BatchSize)
{
using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
timeoutCts.CancelAfter(BatchTimeout);
try
{
while (batch.Count < BatchSize)
{
if (!await channel.Reader.WaitToReadAsync(timeoutCts.Token).ConfigureAwait(false))
{
break;
}

while (batch.Count < BatchSize && channel.Reader.TryRead(out var entry))
{
batch.Add(entry);
}
}
}
catch (OperationCanceledException) when (!stoppingToken.IsCancellationRequested)
{
// Timeout expired - dispatch partial batch
}
}

if (batch.Count > 0)
{
await batchChannel.Writer.WriteAsync(batch, stoppingToken).ConfigureAwait(false);
batch = [];
}
}
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
// Shutting down - drain channel into remaining batches
while (channel.Reader.TryRead(out var entry))
{
batch.Add(entry);

if (batch.Count >= BatchSize)
{
await batchChannel.Writer.WriteAsync(batch, CancellationToken.None).ConfigureAwait(false);
batch = [];
}
}

if (batch.Count > 0)
{
await batchChannel.Writer.WriteAsync(batch, CancellationToken.None).ConfigureAwait(false);
}
}
finally
{
batchChannel.Writer.Complete();
}
}

async Task WriterLoop(int writerId, CancellationToken stoppingToken)
{
logger.LogDebug("{WriterName} writer {WriterId} started", WriterName, writerId);

try
{
// Use CancellationToken.None for FlushBatch so in-flight writes complete
// during shutdown. ReadAllAsync(stoppingToken) controls when we stop
// accepting new batches.
await foreach (var batch in batchChannel.Reader.ReadAllAsync(stoppingToken).ConfigureAwait(false))
{
await FlushBatchAsync(batch, CancellationToken.None).ConfigureAwait(false);
ReportBatchWritten(batch.Count);
}
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
// Expected during shutdown
}

// Drain any remaining batches after the assembler completes the channel
while (batchChannel.Reader.TryRead(out var batch))
{
try
{
await FlushBatchAsync(batch, CancellationToken.None).ConfigureAwait(false);
ReportBatchWritten(batch.Count);
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to flush {Count} entries during shutdown", batch.Count);
}
}

logger.LogDebug("{WriterName} writer {WriterId} stopped", WriterName, writerId);
}

void ReportBatchWritten(int batchCount)
{
totalWritten += batchCount;
var backlog = channel.Reader.Count;
logger.LogDebug("{WriterName}: batch={BatchCount}, total={TotalWritten}, backlog={Backlog}",
WriterName, batchCount, totalWritten, backlog);
if (backlog > BacklogWarningThreshold && DateTime.UtcNow - lastBacklogWarning > TimeSpan.FromSeconds(10))
{
lastBacklogWarning = DateTime.UtcNow;
logger.LogWarning("{WriterName} is not keeping up with ingestion. Channel backlog: {Backlog} items", WriterName, backlog);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
namespace ServiceControl.Audit.Persistence.MongoDB.BodyStorage
{
using System;

readonly struct BodyWriteItem
{
public required string Id { get; init; }
public required string ContentType { get; init; }
public required int BodySize { get; init; }
public required byte[] Body { get; init; }
public string TextBody { get; init; }
public required DateTime ExpiresAt { get; init; }
}
}
Loading
Loading