Configuration
BulkSharp is configured through the AddBulkSharp builder and BulkSharpOptions.
Builder API
services.AddBulkSharp(builder => builder
.ConfigureOptions(opts => { /* BulkSharpOptions */ })
.UseFileStorage(fs => { /* file storage */ })
.UseMetadataStorage(ms => { /* metadata storage */ })
.UseScheduler(s => { /* scheduler */ }));
Each axis has a default if not configured:
- File storage: Local filesystem (
bulksharp-storage/) - Metadata storage: In-memory
- Scheduler: Channels-based background processing
BulkSharpOptions
services.AddBulkSharp(builder => builder
.ConfigureOptions(opts =>
{
opts.MaxFileSizeBytes = 100 * 1024 * 1024; // 100 MB (default)
opts.MaxRowConcurrency = 1; // Sequential (default)
opts.FlushBatchSize = 100; // Rows between flushes (default)
opts.IncludeRowDataInErrors = false; // PII risk if true (default)
opts.EnableOrphanedStepRecovery = false; // For signal-based steps (default)
}));
| Option | Type | Default | Description |
|---|---|---|---|
MaxFileSizeBytes |
long |
104857600 (100 MB) |
Maximum upload file size. Set to 0 to disable. |
MaxRowConcurrency |
int |
1 |
Maximum rows processed in parallel. See Parallel Processing. |
FlushBatchSize |
int |
100 |
Number of rows between progress flushes (error batch writes + status updates). |
IncludeRowDataInErrors |
bool |
false |
Whether to serialize row data into error records. Warning: may contain PII. |
EnableOrphanedStepRecovery |
bool |
false |
Recover rows stuck in WaitingForCompletion after restart. Only needed for signal-based async steps. |
MaxRetryAttempts |
int |
10 |
Maximum number of operation-level retry attempts. Set to 0 to disable the limit. See Retry Guide. |
Convenience Methods
// All defaults (file system + in-memory metadata + Channels scheduler)
services.AddBulkSharp();
// All in-memory + immediate scheduler (testing)
services.AddBulkSharpInMemory();
// API-only: no worker threads, no hosted services, operations stay Pending
services.AddBulkSharpApi();
// API-only with custom storage
services.AddBulkSharpApi(builder => builder
.UseFileStorage(fs => fs.UseS3(opts => opts.BucketName = "uploads"))
.UseMetadataStorage(ms => ms.UseSqlServer(opts =>
opts.ConnectionString = connectionString)));
AddBulkSharpApi() registers IBulkOperationService, IBulkOperationQueryService, storage providers, and data format processors. It uses a built-in NullBulkScheduler that leaves operations in Pending status for a separate Worker process. See API + Worker Architecture.
File Storage Options
| Method | Description |
|---|---|
fs.UseFileSystem(basePath?) |
Local filesystem. Default path: bulksharp-storage |
fs.UseInMemory() |
In-memory storage (testing only) |
fs.UseS3(opts => ...) |
Amazon S3. Requires BulkSharp.Files.S3 package. |
fs.UseCustom<T>() |
Custom IFileStorageProvider implementation |
Metadata Storage Options
| Method | Description |
|---|---|
ms.UseInMemory() |
In-memory repositories (default) |
For SQL Server persistence, register separately:
// SQL Server with typed options
services.AddBulkSharpSqlServer(opts =>
{
opts.ConnectionString = connectionString;
opts.MaxRetryCount = 5;
opts.MaxRetryDelay = TimeSpan.FromSeconds(30);
});
// Custom DbContext
services.AddBulkSharpEntityFramework<MyDbContext>(options =>
options.UseSqlServer(connectionString));
Requires the BulkSharp.Data.EntityFramework package.
Scheduler Options
| Method | Description |
|---|---|
s.UseChannels(opts?) |
Background processing via System.Threading.Channels (default) |
s.UseImmediate() |
Synchronous inline execution (testing only) |
s.UseCustom<T>() |
Custom IBulkScheduler implementation |
The Channels scheduler accepts:
s.UseChannels(opts =>
{
opts.WorkerCount = 4; // Default: 4
opts.PendingPollInterval = TimeSpan.FromSeconds(5); // Default: null (disabled)
});
| Option | Type | Default | Description |
|---|---|---|---|
WorkerCount |
int |
4 |
Concurrent operation workers. |
QueueCapacity |
int |
1000 |
Bounded channel capacity. |
ShutdownTimeout |
TimeSpan |
30s |
Grace period before force-cancelling workers. |
PendingPollInterval |
TimeSpan? |
null |
Interval for polling the database for new Pending operations. null disables polling. Set this when using a separate API process with AddBulkSharpApi(). |
StuckOperationTimeout |
TimeSpan? |
null |
Operations stuck in Running beyond this duration are marked Failed on startup and each poll cycle. null disables. Set this in Worker processes to recover from crashes. |
WorkerCount controls how many operations process concurrently (not rows - see MaxRowConcurrency for row-level parallelism).
Event Hooks
Event handlers, validators, and processors are auto-discovered from scanned assemblies. Just implement the interface — no manual registration needed:
// Auto-discovered: just create the class and it's registered
public class EmailNotificationHandler : IBulkOperationEventHandler
{
public async Task OnOperationCompletedAsync(BulkOperationCompletedEvent e, CancellationToken ct)
{
// Send email notification
}
public async Task OnOperationFailedAsync(BulkOperationFailedEvent e, CancellationToken ct)
{
// Send failure alert
}
}
Available events: OnOperationCreatedAsync, OnStatusChangedAsync, OnOperationCompletedAsync, OnOperationFailedAsync, OnRowFailedAsync.
Handlers run in parallel. A failing handler is logged but never blocks processing.
For explicit control, you can still use builder.AddEventHandler<T>():
services.AddBulkSharp(builder => builder
.AddEventHandler<EmailNotificationHandler>());
Auto-Discovery
BulkSharp scans assemblies for implementations of these extensibility interfaces and registers them in DI automatically:
| Interface | Purpose | Runs |
|---|---|---|
IBulkOperationEventHandler |
Lifecycle events (created, completed, failed) | On state transitions |
IBulkMetadataValidator<TMetadata> |
Validates operation metadata | Before processing starts |
IBulkRowValidator<TMetadata, TRow> |
Cross-cutting row validation | Before each row is processed |
IBulkRowProcessor<TMetadata, TRow> |
Post-processing hook per row | After each row is processed |
Multiple implementations of each interface can coexist — they all run. This enables composable cross-cutting concerns without modifying operation code:
// Validates department is from an allowed list — auto-discovered, runs before operation starts
public class DepartmentAllowlistValidator : IBulkMetadataValidator<UserImportMetadata>
{
public Task ValidateAsync(UserImportMetadata metadata, CancellationToken ct = default)
{
if (!AllowedDepartments.Contains(metadata.Department))
throw new ArgumentException($"Department '{metadata.Department}' is not allowed");
return Task.CompletedTask;
}
}
// Validates email domain per row — auto-discovered, runs before each row
public class CorporateEmailValidator : IBulkRowValidator<UserImportMetadata, UserImportRow>
{
public Task ValidateAsync(UserImportRow row, UserImportMetadata metadata, CancellationToken ct = default)
{
var domain = row.Email.Split('@').LastOrDefault();
if (domain != "example.com")
throw new ArgumentException($"Email domain '{domain}' is not allowed");
return Task.CompletedTask;
}
}
// Audit log after each row — auto-discovered, runs after processing
public class AuditProcessor(ILogger<AuditProcessor> logger)
: IBulkRowProcessor<UserImportMetadata, UserImportRow>
{
public Task ProcessAsync(UserImportRow row, UserImportMetadata metadata, CancellationToken ct = default)
{
logger.LogInformation("[audit] Imported {Email} by {ImportedBy}", row.Email, metadata.ImportedBy);
return Task.CompletedTask;
}
}
Assembly scanning uses the same scope as operation discovery — either all loaded assemblies (default) or the assemblies specified via AddOperationsFromAssembly().
Export Formatter
By default, BulkSharp uses a built-in formatter for CSV and JSON exports. To customize export output, register a custom IBulkExportFormatter:
services.AddBulkSharp(builder => builder
.UseExportFormatter<CustomExportFormatter>());
See Export Guide for details on implementing custom formatters.
Row Tracking
BulkSharp tracks every row's status via BulkRowRecord entries. By default, only status, timestamps, and errors are stored. To also persist the raw row data, set TrackRowData on the operation attribute:
[BulkOperation("import-users", TrackRowData = true)]
public class UserImportOperation : IBulkRowOperation<UserMetadata, UserRow> { ... }
Query row records via IBulkRowRecordRepository:
var rows = await rowRecordRepo.QueryAsync(new BulkRowRecordQuery
{
OperationId = operationId,
ErrorsOnly = true,
Page = 1,
PageSize = 50
});