Watch
in package
implements
CommandSubscriber
Operation for creating a change stream with the aggregate command.
Note: the implementation of CommandSubscriber is an internal implementation detail and should not be considered part of the public API.
Tags
Table of Contents
Interfaces
- CommandSubscriber
Constants
- FULL_DOCUMENT_BEFORE_CHANGE_OFF = 'off'
- FULL_DOCUMENT_BEFORE_CHANGE_REQUIRED = 'required'
- FULL_DOCUMENT_BEFORE_CHANGE_WHEN_AVAILABLE = 'whenAvailable'
- FULL_DOCUMENT_REQUIRED = 'required'
- FULL_DOCUMENT_UPDATE_LOOKUP = 'updateLookup'
- FULL_DOCUMENT_WHEN_AVAILABLE = 'whenAvailable'
Properties
- $aggregate : Aggregate
- $aggregateOptions : array<string|int, mixed>
- $changeStreamOptions : array<string|int, mixed>
- $codec : DocumentCodec|null
- $collectionName : string|null
- $databaseName : string
- $firstBatchSize : int
- $hasResumed : bool
- $manager : Manager
- $operationTime : TimestampInterface|null
- $pipeline : array<string|int, mixed>
- $postBatchResumeToken : object|null
Methods
- __construct() : mixed
- Constructs an aggregate command for creating a change stream.
- execute() : ChangeStream
- Execute the operation.
- createAggregate() : Aggregate
- Create the aggregate command for a change stream.
- createChangeStreamIterator() : ChangeStreamIterator
- Create a ChangeStreamIterator by executing the aggregate command.
- executeAggregate() : CursorInterface
- Execute the aggregate command.
- getInitialResumeToken() : array<string|int, mixed>|object|null
- Return the initial resume token for creating the ChangeStreamIterator.
- resume() : ChangeStreamIterator
- Resumes a change stream.
- shouldCaptureOperationTime() : bool
- Determine whether to capture operation time from an aggregate response.
Constants
FULL_DOCUMENT_BEFORE_CHANGE_OFF
public
mixed
FULL_DOCUMENT_BEFORE_CHANGE_OFF
= 'off'
FULL_DOCUMENT_BEFORE_CHANGE_REQUIRED
public
mixed
FULL_DOCUMENT_BEFORE_CHANGE_REQUIRED
= 'required'
FULL_DOCUMENT_BEFORE_CHANGE_WHEN_AVAILABLE
public
mixed
FULL_DOCUMENT_BEFORE_CHANGE_WHEN_AVAILABLE
= 'whenAvailable'
FULL_DOCUMENT_REQUIRED
public
mixed
FULL_DOCUMENT_REQUIRED
= 'required'
FULL_DOCUMENT_UPDATE_LOOKUP
public
mixed
FULL_DOCUMENT_UPDATE_LOOKUP
= 'updateLookup'
FULL_DOCUMENT_WHEN_AVAILABLE
public
mixed
FULL_DOCUMENT_WHEN_AVAILABLE
= 'whenAvailable'
Properties
$aggregate
private
Aggregate
$aggregate
$aggregateOptions
private
array<string|int, mixed>
$aggregateOptions
$changeStreamOptions
private
array<string|int, mixed>
$changeStreamOptions
$codec
private
DocumentCodec|null
$codec
$collectionName
private
string|null
$collectionName
$databaseName
private
string
$databaseName
$firstBatchSize
private
int
$firstBatchSize
= 0
$hasResumed
private
bool
$hasResumed
= false
$manager
private
Manager
$manager
$operationTime
private
TimestampInterface|null
$operationTime
= null
$pipeline
private
array<string|int, mixed>
$pipeline
$postBatchResumeToken
private
object|null
$postBatchResumeToken
= null
Methods
__construct()
Constructs an aggregate command for creating a change stream.
public
__construct(Manager $manager, string|null $databaseName, string|null $collectionName, array<string|int, mixed> $pipeline[, array<string|int, mixed> $options = [] ]) : mixed
Supported options:
-
batchSize (integer): The number of documents to return per batch.
-
codec (MongoDB\Codec\DocumentCodec): Codec used to decode documents from BSON to PHP objects.
-
collation (document): Specifies a collation.
-
comment (mixed): BSON value to attach as a comment to this command.
Only string values are supported for server versions < 4.4.
-
fullDocument (string): Determines how the "fullDocument" response field will be populated for update operations.
By default, change streams only return the delta of fields (via an "updateDescription" field) for update operations and "fullDocument" is omitted. Insert and replace operations always include the "fullDocument" field. Delete operations omit the field as the document no longer exists.
Specify "updateLookup" to return the current majority-committed version of the updated document.
MongoDB 6.0+ allows returning the post-image of the modified document if the collection has changeStreamPreAndPostImages enabled. Specify "whenAvailable" to return the post-image if available or a null value if not. Specify "required" to return the post-image if available or raise an error if not.
-
fullDocumentBeforeChange (string): Determines how the "fullDocumentBeforeChange" response field will be populated. By default, the field is omitted.
MongoDB 6.0+ allows returning the pre-image of the modified document if the collection has changeStreamPreAndPostImages enabled. Specify "whenAvailable" to return the pre-image if available or a null value if not. Specify "required" to return the pre-image if available or raise an error if not.
-
maxAwaitTimeMS (integer): The maximum amount of time for the server to wait on new documents to satisfy a change stream query.
-
readConcern (MongoDB\Driver\ReadConcern): Read concern.
-
readPreference (MongoDB\Driver\ReadPreference): Read preference. This will be used to select a new server when resuming. Defaults to a "primary" read preference.
-
resumeAfter (document): Specifies the logical starting point for the new change stream.
Using this option in conjunction with "startAfter" and/or "startAtOperationTime" will result in a server error. The options are mutually exclusive.
-
session (MongoDB\Driver\Session): Client session.
-
showExpandedEvents (boolean): Enables the server to send the expanded list of change stream events.
This option is not supported for server versions < 6.0.
-
startAfter (document): Specifies the logical starting point for the new change stream. Unlike "resumeAfter", this option can be used with a resume token from an "invalidate" event.
Using this option in conjunction with "resumeAfter" and/or "startAtOperationTime" will result in a server error. The options are mutually exclusive.
-
startAtOperationTime (MongoDB\BSON\TimestampInterface): If specified, the change stream will only provide changes that occurred at or after the specified timestamp. Any command run against the server will return an operation time that can be used here. Alternatively, an operation time may be obtained from MongoDB\Driver\Server::getInfo().
Using this option in conjunction with "resumeAfter" and/or "startAfter" will result in a server error. The options are mutually exclusive.
This option is not supported for server versions < 4.0.
-
typeMap (array): Type map for BSON deserialization. This will be applied to the returned Cursor (it is not sent to the server).
Note: A database-level change stream may be created by specifying null for the collection name. A cluster-level change stream may be created by specifying null for both the database and collection name.
Parameters
- $manager : Manager
-
Manager instance from the driver
- $databaseName : string|null
-
Database name
- $collectionName : string|null
-
Collection name
- $pipeline : array<string|int, mixed>
-
Aggregation pipeline
- $options : array<string|int, mixed> = []
-
Command options
Tags
execute()
Execute the operation.
public
execute(Server $server) : ChangeStream
Parameters
- $server : Server
Tags
Return values
ChangeStreamcreateAggregate()
Create the aggregate command for a change stream.
private
createAggregate() : Aggregate
This method is also used to recreate the aggregate command when resuming.
Return values
AggregatecreateChangeStreamIterator()
Create a ChangeStreamIterator by executing the aggregate command.
private
createChangeStreamIterator(Server $server) : ChangeStreamIterator
Parameters
- $server : Server
Return values
ChangeStreamIteratorexecuteAggregate()
Execute the aggregate command.
private
executeAggregate(Server $server) : CursorInterface
The command will be executed using APM so that we can capture data from its response (e.g. firstBatch size, postBatchResumeToken).
Parameters
- $server : Server
Return values
CursorInterfacegetInitialResumeToken()
Return the initial resume token for creating the ChangeStreamIterator.
private
getInitialResumeToken() : array<string|int, mixed>|object|null
Tags
Return values
array<string|int, mixed>|object|nullresume()
Resumes a change stream.
private
resume([array<string|int, mixed>|object|null $resumeToken = null ][, bool $hasAdvanced = false ]) : ChangeStreamIterator
Parameters
- $resumeToken : array<string|int, mixed>|object|null = null
- $hasAdvanced : bool = false
Tags
Return values
ChangeStreamIteratorshouldCaptureOperationTime()
Determine whether to capture operation time from an aggregate response.
private
shouldCaptureOperationTime() : bool