Signal-Based Step Completion
The IBulkStepSignalService provides a programmatic API for signaling completion of signal-mode async steps. This is the mechanism that bridges external systems (webhooks, message queues, manual approvals) to the BulkSharp processing pipeline.
For async step basics and polling mode, see Async Steps.
IBulkStepSignalService API
public interface IBulkStepSignalService
{
bool TrySignal(string signalKey);
bool TrySignalFailure(string signalKey, string errorMessage);
}
Both methods return true if a waiter was found and signaled, false if no step is waiting on that key.
- TrySignal: Marks the step as completed successfully. The pipeline proceeds to the next step.
- TrySignalFailure: Marks the step as failed with the provided error message. The error is recorded and the row is marked as failed.
The service is registered as a singleton. Inject it anywhere you need to signal step completion.
How Signal Keys Work
Signal keys are scoped to prevent collisions across operations and rows. When your step returns a key from GetSignalKey, BulkSharp builds the internal key as:
Internal key = "{operationId}:{userKey}:{rowNumber}"
The row number is appended automatically, so even if two rows return the same user key, their internal keys are unique. This means GetSignalKey does not need to include the row identifier — though including it is still recommended for clarity in logs and external systems.
Your step defines the user-facing portion:
public string GetSignalKey(Row row, Metadata meta) => $"order-{row.OrderId}";
When signaling via the REST API, pass only the user portion — the endpoint matches by prefix:
POST /api/bulks/{operationId}/signal/order-{orderId}
If multiple rows share the same user key (e.g., all rows use "approval"), the signal endpoint completes the first waiting row (FIFO). Each subsequent signal call completes the next waiting row.
When signaling via the programmatic API, you need the full scoped key (including row number):
signalService.TrySignal($"{operationId}:order-{orderId}:{rowNumber}");
Or query the DB for the signal key:
var status = await statusRepo.GetBySignalKeyAsync(fullScopedKey, ct);
signalService.TrySignal(status.SignalKey);
Signal Flow
- Step executor calls
ExecuteAsyncon the async step (kicks off external work) SignalCompletionHandlerregisters aTaskCompletionSourcein the in-process signal registry using the scoped key- The row enters
WaitingForCompletionstate and is persisted to the DB - An external system calls
TrySignal(in-process) or the REST endpoint - The
TaskCompletionSourcecompletes, the step executor resumes - If
Timeoutelapses before a signal arrives, the step fails withBulkStepTimeoutException
Cross-Process Signals (API + Worker Architecture)
In a split API + Worker deployment, the API process and Worker process don't share memory. The in-process TrySignal() on the API side cannot reach the Worker's TaskCompletionSource.
BulkSharp handles this automatically:
- The signal REST endpoint first tries the in-process signal (same-process case)
- If no in-process waiter is found (cross-process case), it writes the completion state directly to the DB (
BulkRowStepStatus.State = Completed) - The Worker's
SignalCompletionHandlerpolls the DB every 2 seconds alongside the in-process waiter - When the DB state changes to
CompletedorFailed, the Worker picks it up and resumes the step
This means signal endpoints work identically whether the API and Worker are in the same process or separate processes. No configuration needed.
Integration Patterns
Webhook Callback
Register an endpoint that receives callbacks from an external system. Use the dashboard's built-in signal endpoint — it handles scoping and cross-process signals:
app.MapPost("/webhooks/payment-processor", async (
[FromBody] PaymentWebhook webhook,
HttpClient http) =>
{
// Forward to BulkSharp signal endpoint (handles scoping + cross-process)
var signalUrl = $"/api/bulks/{webhook.BulkOperationId}/signal/payment-{webhook.OrderId}";
if (webhook.Status == "completed")
{
await http.PostAsync(signalUrl, null);
}
else
{
await http.PostAsJsonAsync($"{signalUrl}/fail",
new { ErrorMessage = $"Payment failed: {webhook.Reason}" });
}
return Results.Ok();
});
The async step that initiates the payment would pass the callback URL with the operation ID and signal key embedded:
public class PaymentStep : IAsyncBulkStep<OrderMetadata, OrderRow>
{
public string Name => "Process Payment";
public StepCompletionMode CompletionMode => StepCompletionMode.Signal;
public TimeSpan Timeout => TimeSpan.FromMinutes(30);
public TimeSpan PollInterval => TimeSpan.Zero;
public int MaxRetries => 1;
private readonly IPaymentClient _paymentClient;
public PaymentStep(IPaymentClient paymentClient)
=> _paymentClient = paymentClient;
public async Task ExecuteAsync(OrderRow row, OrderMetadata meta, CancellationToken ct)
{
await _paymentClient.InitiatePaymentAsync(new PaymentRequest
{
OrderId = row.OrderId,
Amount = row.Amount,
CallbackUrl = $"{meta.BaseUrl}/webhooks/payment-processor",
CallbackMetadata = new { BulkOperationId = meta.OperationId, OrderId = row.OrderId }
}, ct);
}
public Task<bool> CheckCompletionAsync(OrderRow row, OrderMetadata meta, CancellationToken ct)
=> Task.FromResult(false);
public string GetSignalKey(OrderRow row, OrderMetadata meta) => $"payment-{row.OrderId}";
}
Message Queue Consumer (Same-Process Worker)
When the consumer runs in the same process as the Worker, use IBulkStepSignalService directly. You need the full scoped key — query the DB for waiting statuses:
public class ApprovalQueueConsumer : BackgroundService
{
private readonly IBulkStepSignalService _signalService;
private readonly IServiceProvider _serviceProvider;
private readonly IMessageQueue _queue;
protected override async Task ExecuteAsync(CancellationToken ct)
{
await foreach (var message in _queue.ConsumeAsync("approval-responses", ct))
{
using var scope = _serviceProvider.CreateScope();
var rowRecordRepo = scope.ServiceProvider.GetRequiredService<IBulkRowRecordRepository>();
// Find the waiting step by querying the DB
var waiting = await rowRecordRepo.QueryAsync(new BulkRowRecordQuery
{
OperationId = message.OperationId,
State = RowRecordState.WaitingForCompletion
}, ct);
var record = waiting.Items.FirstOrDefault(r =>
r.SignalKey?.Contains($"approval-{message.TicketId}:") == true);
if (record?.SignalKey != null)
{
if (message.Approved)
_signalService.TrySignal(record.SignalKey);
else
_signalService.TrySignalFailure(record.SignalKey, message.DenialReason);
}
await _queue.AcknowledgeAsync(message, ct);
}
}
}
Message Queue Consumer (Separate Process)
When the consumer runs in a different process than the Worker, use the REST signal endpoint instead — it handles cross-process DB-backed signaling automatically:
protected override async Task ExecuteAsync(CancellationToken ct)
{
await foreach (var message in _queue.ConsumeAsync("approval-responses", ct))
{
var signalUrl = $"{_bulkSharpApiUrl}/api/bulks/{message.OperationId}/signal/approval-{message.TicketId}";
if (message.Approved)
await _http.PostAsync(signalUrl, null, ct);
else
await _http.PostAsJsonAsync($"{signalUrl}/fail",
new { ErrorMessage = message.DenialReason }, ct);
await _queue.AcknowledgeAsync(message, ct);
}
}
### REST API (Dashboard Endpoints)
The dashboard exposes built-in signal endpoints - no custom code needed:
```bash
# Signal success
curl -X POST http://localhost:5000/api/bulks/{operationId}/signal/{key}
# Signal failure
curl -X POST http://localhost:5000/api/bulks/{operationId}/signal/{key}/fail \
-H "Content-Type: application/json" \
-d '{"errorMessage": "Approval denied"}'
These endpoints look up the step status by signal key, verify it is still waiting, then call TrySignal or TrySignalFailure.
Orphaned Step Recovery
Signal-mode steps hold an in-process TaskCompletionSource. If the application restarts while steps are waiting, those waiters are lost and the rows become permanently stuck in WaitingForCompletion.
Enable orphaned step recovery to handle this:
builder.Services.AddBulkSharp(bulk => bulk
.ConfigureOptions(opts => opts.EnableOrphanedStepRecovery = true));
On startup, OrphanedStepRecoveryService (a hosted service) runs automatically:
- Queries all
Runningoperations - Finds rows in
WaitingForCompletionstate older than 5 minutes - Transitions them to
Failedwith the message: "Recovery: step was waiting for signal when application restarted" - Marks the parent operation as failed
It also detects stuck Running operations with no waiting rows (e.g., the app crashed mid-processing) and marks them as failed.
If the database is not yet available (first startup before migrations), the recovery service logs a warning and exits gracefully without throwing.
When to enable: Production deployments with signal-mode steps where restarts are possible. Disable for development or when using only polling-mode steps.
Complete Example
A shipment processing operation that waits for carrier confirmation:
[BulkOperation("shipment-tracking")]
public class ShipmentOperation : IBulkPipelineOperation<ShipmentMetadata, ShipmentRow>
{
public Task ValidateMetadataAsync(ShipmentMetadata meta, CancellationToken ct)
=> Task.CompletedTask;
public Task ValidateRowAsync(ShipmentRow row, ShipmentMetadata meta, CancellationToken ct)
{
if (string.IsNullOrEmpty(row.TrackingNumber))
throw new ValidationException("Tracking number is required");
return Task.CompletedTask;
}
public IEnumerable<IBulkStep<ShipmentMetadata, ShipmentRow>> GetSteps()
{
yield return new SubmitToCarrierStep();
yield return new WaitForPickupStep(); // Signal-mode async step
yield return new UpdateInventoryStep();
}
}
public class WaitForPickupStep : IAsyncBulkStep<ShipmentMetadata, ShipmentRow>
{
public string Name => "Wait for Pickup";
public int MaxRetries => 0;
public StepCompletionMode CompletionMode => StepCompletionMode.Signal;
public TimeSpan PollInterval => TimeSpan.Zero;
public TimeSpan Timeout => TimeSpan.FromHours(48);
public Task ExecuteAsync(ShipmentRow row, ShipmentMetadata meta, CancellationToken ct)
{
// Register webhook with carrier - carrier will call back at /webhooks/carrier
return Task.CompletedTask;
}
public Task<bool> CheckCompletionAsync(ShipmentRow row, ShipmentMetadata meta, CancellationToken ct)
=> Task.FromResult(false);
public string GetSignalKey(ShipmentRow row, ShipmentMetadata meta)
=> $"pickup-{row.TrackingNumber}";
}
Webhook endpoint (using built-in signal API for cross-process support):
app.MapPost("/webhooks/carrier", async (
[FromBody] CarrierEvent evt,
HttpClient http) =>
{
// Use the built-in signal endpoint — works across processes
await http.PostAsync(
$"/api/bulks/{evt.OperationId}/signal/pickup-{evt.TrackingNumber}", null);
return Results.Ok();
});
Startup configuration:
builder.Services.AddBulkSharp(bulk => bulk
.ConfigureOptions(opts =>
{
opts.MaxRowConcurrency = 10;
opts.EnableOrphanedStepRecovery = true;
})
.AddOperationsFromAssemblyOf<ShipmentOperation>()
);