In the end, the messages have to be saved somewhere. Each message contains an event and the associated headers.
More information can be found in the message documentation.
The store is optimized to efficiently store and load events for aggregates.
We offer different stores to store the messages. Two stores based on doctrine dbal and one in-memory store for testing purposes.
This is the current default store for event sourcing.
You can create a store with the DoctrineDbalStore class.
The store needs a dbal connection, an event serializer and has some optional parameters like options.
use Doctrine\DBAL\DriverManager;
use Doctrine\DBAL\Tools\DsnParser;
use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer;
use Patchlevel\EventSourcing\Store\DoctrineDbalStore;
$connection = DriverManager::getConnection(
(new DsnParser())->parse('pdo-pgsql://user:secret@localhost/app'),
);
$store = new DoctrineDbalStore(
$connection,
DefaultEventSerializer::createFromPaths(['src/Event']),
);You can find out more about how to create a connection in the doctrine dbal documentation.
Following options are available in DoctrineDbalStore:
| Option | Type | Default | Description |
|---|---|---|---|
| table_name | string | eventstore | The name of the table in the database |
| aggregate_id_type | "uuid"/"string" | uuid | The type of the aggregate_id column |
| locking | bool | true | If the store should use locking for writing |
| lock_id | int | 133742 | The id of the lock |
| lock_timeout | int | -1 | The timeout of the lock. -1 means no timeout |
The table structure of the DoctrineDbalStore looks like this:
| Column | Type | Description |
|---|---|---|
| id | bigint | The index of the whole stream (autoincrement) |
| aggregate | string | The name of the aggregate |
| aggregate_id | uuid/string | The id of the aggregate |
| playhead | int | The current playhead of the aggregate |
| event | string | The name of the event |
| payload | json | The payload of the event |
| recorded_on | datetime | The date when the event was recorded |
| new_stream_start | bool | If the event is the first event of the aggregate |
| archived | bool | If the event is archived |
| custom_headers | json | Custom headers for the event |
The default type of the aggregate_id column is uuid if the database supports it and string if not.
You can change the type with the aggregate_id_type option to string if you want to use a custom id.
We offer a new store called StreamDoctrineDbalStore.
This store is decoupled from the aggregate and can be used to store events from other sources.
The difference to the DoctrineDbalStore is that the StreamDoctrineDbalStore merges the aggregate id
and the aggregate name into one column named stream. Additionally, the column playhead is nullable.
This store introduces two new methods streams and remove.
The store needs a dbal connection, an event serializer and has some optional parameters like options.
use Doctrine\DBAL\DriverManager;
use Doctrine\DBAL\Tools\DsnParser;
use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer;
use Patchlevel\EventSourcing\Store\StreamDoctrineDbalStore;
$connection = DriverManager::getConnection(
(new DsnParser())->parse('pdo-pgsql://user:secret@localhost/app'),
);
$store = new StreamDoctrineDbalStore(
$connection,
DefaultEventSerializer::createFromPaths(['src/Event']),
);You can find out more about how to create a connection in the doctrine dbal documentation.
Following options are available in StreamDoctrineDbalStore:
| Option | Type | Default | Description |
|---|---|---|---|
| table_name | string | event_store | The name of the table in the database |
| locking | bool | true | If the store should use locking for writing |
| lock_id | int | 133742 | The id of the lock |
| lock_timeout | int | -1 | The timeout of the lock. -1 means no timeout |
| keep_index | bool | false | If enabled, the index header is kept on save |
The table structure of the StreamDoctrineDbalStore looks like this:
| Column | Type | Description |
|---|---|---|
| id | bigint | The index of the whole stream (autoincrement) |
| stream | string | The name of the stream |
| playhead | ?int | The current playhead of the aggregate |
| event_id | string | The id of the event |
| event_name | string | The name of the event |
| event_payload | json | The payload of the event |
| recorded_on | datetime | The date when the event was recorded |
| new_stream_start | bool | If the event is the first event of the aggregate |
| archived | bool | If the event is archived |
| custom_headers | json | Custom headers for the event |
We also offer an in-memory store for testing purposes.
use Patchlevel\EventSourcing\Store\InMemoryStore;
$store = new InMemoryStore();You can pass messages to the constructor to initialize the store with some events.
Last but not least, we offer two read-only stores.
One for the DoctrineDbalStore and one for the StreamDoctrineDbalStore.
It passes all methods to the underlying store, but throws a StoreIsReadOnly exception when trying to execute write
operations.
use Patchlevel\EventSourcing\Store\ReadOnlyStore;
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Store\StreamReadOnlyStore;
use Patchlevel\EventSourcing\Store\StreamStore;
/** @var Store $store */
$readOnlyStore = new ReadOnlyStore($store);
/** @var StreamStore $store */
$readOnlyStore = new StreamReadOnlyStore($store);With the help of the SchemaDirector, the database structure can be created, updated and deleted.
You can also use doctrine migration to create and keep your schema in sync.
The SchemaDirector is responsible for creating, updating and deleting the database schema.
The DoctrineSchemaDirector is a concrete implementation of the SchemaDirector for doctrine dbal.
Additionally, it implements the DryRunSchemaDirector interface, to show the sql statements that would be executed.
use Doctrine\DBAL\Connection;
use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector;
use Patchlevel\EventSourcing\Store\Store;
/**
* @var Connection $connection
* @var Store $store
*/
$schemaDirector = new DoctrineSchemaDirector(
$connection,
$store,
);How to setup cli commands for the schema director can be found in the cli documentation.
You can create the table from scratch using the create method.
use Patchlevel\EventSourcing\Schema\SchemaDirector;
/** @var SchemaDirector $schemaDirector */
$schemaDirector->create();Or can give you back which SQL statements would be necessary for this. Either for a dry run, or to define your own migrations.
use Patchlevel\EventSourcing\Schema\DryRunSchemaDirector;
/** @var DryRunSchemaDirector $schemaDirector */
$sql = $schemaDirector->dryRunCreate();The update method compares the current state in the database and how the table should be structured. As a result, the diff is executed to bring the table to the desired state.
use Patchlevel\EventSourcing\Schema\SchemaDirector;
/** @var SchemaDirector $schemaDirector */
$schemaDirector->update();Or can give you back which SQL statements would be necessary for this.
use Patchlevel\EventSourcing\Schema\DryRunSchemaDirector;
/** @var DryRunSchemaDirector $schemaDirector */
$sql = $schemaDirector->dryRunUpdate();You can also delete the table with the drop method.
use Patchlevel\EventSourcing\Schema\SchemaDirector;
/** @var SchemaDirector $schemaDirector */
$schemaDirector->drop();Or can give you back which SQL statements would be necessary for this.
use Patchlevel\EventSourcing\Schema\DryRunSchemaDirector;
/** @var DryRunSchemaDirector $schemaDirector */
$sql = $schemaDirector->dryRunDrop();You can use doctrine migration,
which is known from doctrine orm,
to create your schema and keep it in sync.
We have added a DoctrineMigrationSchemaProvider for doctrine migrations so that you just have to plug the whole thing
together.
use Doctrine\DBAL\Connection;
use Doctrine\Migrations\Configuration\Connection\ExistingConnection;
use Doctrine\Migrations\Configuration\Migration\ConfigurationLoader;
use Doctrine\Migrations\DependencyFactory;
use Doctrine\Migrations\Provider\SchemaProvider;
use Patchlevel\EventSourcing\Schema\DoctrineMigrationSchemaProvider;
use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector;
use Patchlevel\EventSourcing\Store\Store;
// event sourcing schema director configuration
/**
* @var Connection $connection
* @var Store $store
*/
$schemaDirector = new DoctrineSchemaDirector(
$connection,
$store,
);
$schemaProvider = new DoctrineMigrationSchemaProvider($schemaDirector);
// doctrine migration configuration
/** @var ConfigurationLoader $configLoader */
$dependencyFactory = DependencyFactory::fromConnection(
$configLoader,
new ExistingConnection($connection),
);
$dependencyFactory->setService(
SchemaProvider::class,
$schemaProvider,
);Here you can find more information on how to configure doctrine migration.
How to setup cli commands for doctrine migration can be found in the cli documentation.
The store has a few methods to interact with the database.
You can load all events from an aggregate with the load method.
This method returns a Stream object, which is a collection of events.
use Patchlevel\EventSourcing\Store\Store;
/** @var Store $store */
$stream = $store->load();The load method also has a few parameters to filter, limit and sort the events.
use Patchlevel\EventSourcing\Store\Criteria\Criteria;
use Patchlevel\EventSourcing\Store\Store;
/** @var Store $store */
$stream = $store->load(
new Criteria(), // filter criteria
100, // limit
50, // offset
true, // latest first
);The Criteria object is used to filter the events.
use Patchlevel\EventSourcing\Store\Criteria\AggregateIdCriterion;
use Patchlevel\EventSourcing\Store\Criteria\AggregateNameCriterion;
use Patchlevel\EventSourcing\Store\Criteria\ArchivedCriterion;
use Patchlevel\EventSourcing\Store\Criteria\Criteria;
use Patchlevel\EventSourcing\Store\Criteria\EventsCriterion;
use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion;
use Patchlevel\EventSourcing\Store\Criteria\FromPlayheadCriterion;
$criteria = new Criteria(
new AggregateNameCriterion('profile'),
new AggregateIdCriterion('e3e3e3e3-3e3e-3e3e-3e3e-3e3e3e3e3e3e'),
new FromPlayheadCriterion(2),
new FromIndexCriterion(100),
new ArchivedCriterion(true),
new EventsCriterion(['profile.created', 'profile.name_changed']),
);Or you can use the criteria builder to create the criteria.
use Patchlevel\EventSourcing\Store\Criteria\CriteriaBuilder;
$criteria = (new CriteriaBuilder())
->aggregateName('profile')
->aggregateId('e3e3e3e3-3e3e-3e3e-3e3e-3e3e3e3e3e3e')
->fromPlayhead(2)
->fromIndex(100)
->archived(true)
->events(['profile.created', 'profile.name_changed'])
->build();The load method returns a Stream object and is a generator.
This means that the messages are only loaded when they are needed.
use Patchlevel\EventSourcing\Store\Stream;
/** @var Stream $stream */
$stream->index(); // get the index of the stream
$stream->position(); // get the current position of the stream
$stream->current(); // get the current event
$stream->next(); // move to the next event
$stream->end(); // check if the stream is at the end
foreach ($stream as $message) {
$message->event(); // get the event
}You can find more information about the message object in the message documentation.
The stream cannot rewind, so you can only iterate over it once.
If you want to iterate over it again, you have to call the load method again.
You can count the number of events in the store with the count method.
use Patchlevel\EventSourcing\Store\Store;
/** @var Store $store */
$count = $store->count();The count method also has the possibility to filter the events.
use Patchlevel\EventSourcing\Store\Criteria\Criteria;
use Patchlevel\EventSourcing\Store\Store;
/** @var Store $store */
$count = $store->count(
new Criteria(), // filter criteria
);You can save a message with the save method.
use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Store\Store;
/**
* @var Store $store
* @var Message $message
* @var Message $message1
* @var Message $message2
* @var Message $message3
* @var list<Message> $messages
*/
$store->save($message);
$store->save($message1, $message2, $message3);
$store->save(...$messages);The saving happens in a transaction, so all messages are saved or none. The store locks the table for writing during each save by default.
Use the transactional method if you want to call multiple save methods in one transaction.
It is not possible to update events. In event sourcing, the events are immutable.
You can remove a stream with the remove method.
use Patchlevel\EventSourcing\Store\StreamStore;
/** @var StreamStore $store */
$store->remove('profile-*');The method is only available in the StreamStore like StreamDoctrineDbalStore.
You can list all streams with the streams method.
use Patchlevel\EventSourcing\Store\StreamStore;
/** @var StreamStore $store */
$streams = $store->streams(); // ['profile-1', 'profile-2', 'profile-3']The method is only available in the StreamStore like StreamDoctrineDbalStore.
There is also the possibility of executing a function in a transaction. The store takes care of starting a transaction, committing it and then possibly rollback it again.
use Patchlevel\EventSourcing\Store\Store;
/** @var Store $store */
$store->transactional(static function () use ($command, $bankAccountRepository): void {
$accountFrom = $bankAccountRepository->get($command->from());
$accountTo = $bankAccountRepository->get($command->to());
$accountFrom->transferMoney($command->to(), $command->amount());
$accountTo->receiveMoney($command->from(), $command->amount());
$bankAccountRepository->save($accountFrom);
$bankAccountRepository->save($accountTo);
});The store locks the table for writing during the transaction by default.
If you only want to save one aggregate, you don't have to use the transactional method. The save method in store and repository is already transactional.