You can find out more about event sourcing in the library documentation. This documentation is limited to bundle integration and configuration.
We provide a default configuration that should work for most projects.
A path must be specified for Event Sourcing to know where to look for your aggregates. If you want you can use glob patterns to specify multiple paths.
patchlevel_event_sourcing:
aggregates: '%kernel.project_dir%/src/*/Domain'Or use an array to specify multiple paths.
patchlevel_event_sourcing:
aggregates:
- '%kernel.project_dir%/src/Hotel/Domain'
- '%kernel.project_dir%/src/Room/Domain'The library will automatically register all classes marked with the #[Aggregate] attribute in the specified paths.
If you want to learn more about aggregates, read the library documentation.
A path must be specified for Event Sourcing to know where to look for your events. If you want you can use glob patterns to specify multiple paths.
patchlevel_event_sourcing:
events: '%kernel.project_dir%/src/*/Domain/Event'Or use an array to specify multiple paths.
patchlevel_event_sourcing:
events:
- '%kernel.project_dir%/src/Hotel/Domain/Event'
- '%kernel.project_dir%/src/Room/Domain/Event'If you want to learn more about events, read the library documentation.
If you want to implement custom headers for your application, you must specify the paths to look for those headers. If you want you can use glob patterns to specify multiple paths.
patchlevel_event_sourcing:
headers: '%kernel.project_dir%/src/*/Domain/Header'Or use an array to specify multiple paths.
patchlevel_event_sourcing:
headers:
- '%kernel.project_dir%/src/Hotel/Domain/Header'
- '%kernel.project_dir%/src/Room/Domain/Header'If you want to learn more about custom headers, read the library documentation.
You have to specify the connection url to the event store.
patchlevel_event_sourcing:
connection:
url: '%env(EVENTSTORE_URL)%'You can find out more about how to create a connection here
Per default, our event sourcing connection is not available to use in your application. But you can create a dedicated connection that you can use for your projections.
patchlevel_event_sourcing:
connection:
url: '%env(EVENTSTORE_URL)%'
provide_dedicated_connection: trueIf you use doctrine migrations, you should exclude you projection tables from the schema generation. The schema is managed by the subscription engine and should not be managed by doctrine.
You can autowire the connection in your services like this:
use Doctrine\DBAL\Connection;
class YourService
{
public function __construct(
private readonly Connection $projectionConnection,
) {
}
}If you have installed the doctrine bundle, you can also define the connection via doctrine and then use it in the store.
doctrine:
dbal:
connections:
eventstore:
url: '%env(EVENTSTORE_URL)%'
patchlevel_event_sourcing:
connection:
service: doctrine.dbal.eventstore_connectionDo not use the same connection for event sourcing and your projections, otherwise you may run into transaction problems.
If you want to use the same connection as doctrine orm, then you have to set the flag merge_orm_schema.
Otherwise you should avoid using the same connection as other tools.
You can find out more about the dbal configuration here.
If you are using Doctrine for your projections too, you need to create a dedicated connection for this.
You can do this by defining a new connection named projection in the doctrine.yaml file
and use the same connection url as for the event store.
doctrine:
dbal:
connections:
eventstore:
url: '%env(EVENTSTORE_URL)%'
projection:
url: '%env(EVENTSTORE_URL)%'
patchlevel_event_sourcing:
connection:
service: doctrine.dbal.eventstore_connectionYou should exclude your projection tables from the schema generation.
doctrine:
dbal:
schema_filter: ~^(projection_)~Then you can use this connection in your projections.
If you are using autowiring you can inject the right connection Connection $projectionConnection parameter name.
The prefix projection is used to identify the connection.
namespace App\Projection;
use Doctrine\DBAL\Connection;
use Patchlevel\EventSourcing\Attribute\Projector;
#[Projector('my_projection')]
class MyProjection
{
public function __construct(
private readonly Connection $projectionConnection,
) {
}
}The store and schema is configurable.
You can change the store type of the event store.
patchlevel_event_sourcing:
store:
type: 'in_memory'Following store types are available:
dbal_aggregate default (deprecated)dbal_stream recommendedin_memorycustomIf you use custom store type, you need to set the service id under patchlevel_event_sourcing.store.service.
You can change the table name of the event store.
patchlevel_event_sourcing:
store:
options:
table_name: 'my_event_store'For dbal_aggregate and dbal_stream store types you can activate the read only mode.
Readings are possible, but if you try to write, an exception StoreIsReadOnly is thrown.
patchlevel_event_sourcing:
store:
read_only: trueThis is useful if you have maintenance work on the event store and you want to avoid side effects.
You can also merge the schema with doctrine orm. You have to set the following flag for this:
patchlevel_event_sourcing:
store:
merge_orm_schema: trueIf you want to merge the schema, then the same doctrine connection must be used as with the doctrine orm. Otherwise errors may occur!
All schema relevant commands are removed if you activate this option. You should use the doctrine commands then.
If you want to learn more about store, read the library documentation.
Only available in in_memory store. If you want to reset the store after each kernel request, you can activate this option.
So you can avoid side effects between the tests.
patchlevel_event_sourcing:
store:
kernel_reset: trueIf you want to migrate from your current store to a new store, you can use the following configuration.
This register a new store and a new cli command event-sourcing:store:migrate.
You can define translators to translate the old events to the new store.
Here is an example for a migration from dbal_aggregate to dbal_stream.
patchlevel_event_sourcing:
store:
migrate_to_new_store:
type: 'dbal_stream'
options:
table_name: 'my_stream_store'
translators:
- Patchlevel\EventSourcing\Message\Translator\AggregateToStreamHeaderTranslatorMake sure that you use different table names for the old and new store. Otherwise your event store will be destroyed.
Set the read_only flag to true for the old store to avoid side effects
and missing events during the migration.
You can use doctrine migrations to manage the schema.
patchlevel_event_sourcing:
migration:
namespace: EventSourcingMigrations
path: "%kernel.project_dir%/migrations"You can find out more about subscriptions in the library documentation.
You can change where the subscription engine stores its necessary information about the subscription.
Default is dbal, which means it stores it in the same DB that is used by the dbal event store.
Otherwise you can choose between the following stores:
dbal defaultin_memorystatic_in_memorycustompatchlevel_event_sourcing:
subscription:
store:
type: 'custom' # default is 'dbal'
service: 'my_subscription_store'
options:
table_name: 'my_subscription_store'If you are using the doctrine-test-bundle,
you can use the static_in_memory store for testing.
If aggregates are used in the processors and new events are generated there,
then they are not part of the current subscription engine run and will only be processed during the next run or boot.
This is usually not a problem in dev or prod environment because a worker is used
and these events will be processed at some point. But in testing it is not so easy.
For this reason, you can activate the catch_up option.
patchlevel_event_sourcing:
subscription:
catch_up: trueYou can activate the throw_on_error option to throw an exception if a subscription engine run has an error.
This is useful for testing or development to get directly feedback if something is wrong.
patchlevel_event_sourcing:
subscription:
throw_on_error: trueThis option should not be used in production. The normal behavior is to log the error and continue.
If you want to run the subscription engine after an aggregate is saved, you can activate this option. This is useful for testing or development, so you don't have run a worker to process the events.
patchlevel_event_sourcing:
subscription:
run_after_aggregate_save: trueIf you want to automatically setup the subscription engine, you can activate this option. This is useful for development, so you don't have to setup the subscription engine manually.
patchlevel_event_sourcing:
subscription:
auto_setup: trueThis works only before each http requests and not if you use the console commands.
If you want to rebuild the subscription engine after a file change, you can activate this option. This is also useful for development, so you don't have to rebuild the projections manually.
patchlevel_event_sourcing:
subscription:
rebuild_after_file_change: trueThis works only before each http requests and not if you use the console commands.
This is using the cache system to store the latest file change time. You can change the cache pool with the cache_pool option.
Depending on the database you are using for the eventstore it may be happening that your subscriptions are skipping some events. This is due to how auto-increments work in these databases in combination with e.g. longer open transactions. Even when not working with longer open transactions, this may occur if load is high on the database. We already have a locking mechanism in place to prevent this behaviour which throttles write speed. Gap Detection operates different, it checks if a gap between the last message handled and the current message is present. If so it waits a reasonable amount of time and re-fetches the message. This results into slower updates for the subscriptions but creates more resilience.
patchlevel_event_sourcing:
subscription:
gap_detection: ~For more context you can read more about this in this issue.
You can use both techniques locking and gap detecion to mitigate gaps happening in the subscriptions.
You can also define how often the gap detection should re-check the gap and how long it should wait, in this example we instantly retry the first time, then we wait 500ms and after that we check a last time after 1 second.
patchlevel_event_sourcing:
subscription:
gap_detection:
retries_in_ms: [0, 500, 1000]Another config option is to define the detection window. The option defines the timeframe from now if we should check
for a gap. It's defined as an DateInterval so you need to
provide a valid string for it.
patchlevel_event_sourcing:
subscription:
gap_detection:
detection_window: 'PT5M'You can enable the command bus integration to use your aggregates as command handlers. For this bundle we provide only a symfony messenger integration, so you have to define the bus in the messenger configuration.
framework:
messenger:
default_bus: command.bus
buses:
command.bus: ~After this, you need to define the command bus in the event sourcing configuration. This will automatically register the aggregate handlers for the symfony messenger, so you can handle commands with your aggregates.
patchlevel_event_sourcing:
command_bus:
service: command.busYou can find out more about the command bus and the aggregate handlers here.
You can define the default instant retry configuration for the command bus. This will be used if you don't define a retry configuration for a specific command.
patchlevel_event_sourcing:
command_bus:
instant_retry:
default_max_retries: 3
default_exceptions:
- Patchlevel\EventSourcing\Repository\AggregateOutdatedYou can find out more about instant retry here.
You can enable the query bus integration to use queries to retrieve data from your system. For this bundle we provide only a symfony messenger integration, so you have to define the bus in the messenger configuration.
framework:
messenger:
buses:
query.bus: ~After this, you need to define the query bus in the event sourcing configuration. This will automatically register the handlers for the symfony messenger, so you can handle queries in your services.
patchlevel_event_sourcing:
query_bus:
service: query.busYou can find out more about the query bus here.
You can enable the event bus to listen for events and messages synchronously. But you should consider using the subscription engine for this.
patchlevel_event_sourcing:
event_bus: ~Default is the patchlevel event bus.
First of all we have our own default event bus.
This works best with the library, as the #[Subscribe] attribute is used there, among other things.
patchlevel_event_sourcing:
event_bus:
type: defaultYou don't have to specify this as it is the default value.
But you can also use Symfony Messenger. To do this, you first have to define a suitable message bus. This must be "allow_no_handlers" so that this messenger can be an event bus according to the definition.
# messenger.yaml
framework:
messenger:
buses:
event.bus:
default_middleware: allow_no_handlersWe can then use this messenger or event bus in event sourcing:
patchlevel_event_sourcing:
event_bus:
type: symfony
service: event.busSince the event bus was replaced, event sourcing own attributes no longer work. You use the Symfony attributes instead.
use Patchlevel\EventSourcing\EventBus\Message;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
#[AsMessageHandler('event.bus')]
class SmsNotificationHandler
{
public function __invoke(Message $message): void
{
if (!$message instanceof GuestIsCheckedIn) {
return;
}
// ... do some work - like sending an SMS message!
}
}You can also use any other event bus that implements the PSR-14 standard.
patchlevel_event_sourcing:
event_bus:
type: psr14
service: my.event.bus.serviceLike the Symfony event bus, the event sourcing attributes no longer work here. You have to use the system that comes with the respective psr14 implementation.
You can also use your own event bus that implements the Patchlevel\EventSourcing\EventBus\EventBus interface.
patchlevel_event_sourcing:
event_bus:
type: custom
service: my.event.bus.serviceLike the Symfony event bus, the event sourcing attributes no longer work here. You have to use the system that comes with the respective custom implementation.
You can use symfony cache to define the target of the snapshot store.
framework:
cache:
default_redis_provider: 'redis://localhost'
pools:
event_sourcing.cache:
adapter: cache.adapter.redisAfter this, you need define the snapshot store. Symfony cache implement the psr6 interface, so we need choose this type and enter the id from the cache service.
patchlevel_event_sourcing:
snapshot_stores:
default:
service: event_sourcing.cacheFinally, you have to tell the aggregate that it should use this snapshot store.
namespace App\Profile\Domain;
use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot;
use Patchlevel\EventSourcing\Attribute\Aggregate;
use Patchlevel\EventSourcing\Attribute\Snapshot;
#[Aggregate(name: 'profile')]
#[Snapshot('default')]
final class Profile extends BasicAggregateRoot
{
// ...
}You can find out more about snapshots here.
You can use the library to encrypt and decrypt personal data. For this you need to enable the crypto shredding.
patchlevel_event_sourcing:
cryptography:
use_encrypted_field_name: trueYou should activate use_encrypted_field_name to mark the fields that are encrypted.
That allows you later to migrate not encrypted fields to encrypted fields.
If you have already encrypted fields, you can activate fallback_to_field_name to use the old field name as fallback.
If you want to use another algorithm, you can specify this here:
patchlevel_event_sourcing:
cryptography:
algorithm: 'aes-256-gcm'You can find out more about personal data here.
The clock is used to return the current time as DateTimeImmutable.
You can freeze the clock for testing purposes:
when@test:
patchlevel_event_sourcing:
clock:
freeze: '2020-01-01 22:00:00'If freeze is not set, then the system clock is used.
Since symfony 6.2 there is a clock implementation based on psr-20 that you can use.
composer require symfony/clockpatchlevel_event_sourcing:
clock:
service: 'clock'You can also use your own implementation of your choice. They only have to implement the interface of the psr-20. You can then specify this service here:
patchlevel_event_sourcing:
clock:
service: 'my_own_clock_service'