Documentation

Watch
in package
implements CommandSubscriber

FinalYes

Operation for creating a change stream with the aggregate command.

Note: the implementation of CommandSubscriber is an internal implementation detail and should not be considered part of the public API.

Tags
see
Collection::watch()
see
https://mongodb.com/docs/manual/changeStreams/

Table of Contents

Interfaces

CommandSubscriber

Constants

FULL_DOCUMENT_BEFORE_CHANGE_OFF  = 'off'
FULL_DOCUMENT_BEFORE_CHANGE_REQUIRED  = 'required'
FULL_DOCUMENT_BEFORE_CHANGE_WHEN_AVAILABLE  = 'whenAvailable'
FULL_DOCUMENT_REQUIRED  = 'required'
FULL_DOCUMENT_UPDATE_LOOKUP  = 'updateLookup'
FULL_DOCUMENT_WHEN_AVAILABLE  = 'whenAvailable'

Properties

$aggregate  : Aggregate
$aggregateOptions  : array<string|int, mixed>
$changeStreamOptions  : array<string|int, mixed>
$codec  : DocumentCodec|null
$collectionName  : string|null
$databaseName  : string
$firstBatchSize  : int
$hasResumed  : bool
$manager  : Manager
$operationTime  : TimestampInterface|null
$pipeline  : array<string|int, mixed>
$postBatchResumeToken  : object|null

Methods

__construct()  : mixed
Constructs an aggregate command for creating a change stream.
execute()  : ChangeStream
Execute the operation.
createAggregate()  : Aggregate
Create the aggregate command for a change stream.
createChangeStreamIterator()  : ChangeStreamIterator
Create a ChangeStreamIterator by executing the aggregate command.
executeAggregate()  : CursorInterface
Execute the aggregate command.
getInitialResumeToken()  : array<string|int, mixed>|object|null
Return the initial resume token for creating the ChangeStreamIterator.
resume()  : ChangeStreamIterator
Resumes a change stream.
shouldCaptureOperationTime()  : bool
Determine whether to capture operation time from an aggregate response.

Constants

FULL_DOCUMENT_BEFORE_CHANGE_OFF

public mixed FULL_DOCUMENT_BEFORE_CHANGE_OFF = 'off'

FULL_DOCUMENT_BEFORE_CHANGE_REQUIRED

public mixed FULL_DOCUMENT_BEFORE_CHANGE_REQUIRED = 'required'

FULL_DOCUMENT_BEFORE_CHANGE_WHEN_AVAILABLE

public mixed FULL_DOCUMENT_BEFORE_CHANGE_WHEN_AVAILABLE = 'whenAvailable'

FULL_DOCUMENT_REQUIRED

public mixed FULL_DOCUMENT_REQUIRED = 'required'

FULL_DOCUMENT_UPDATE_LOOKUP

public mixed FULL_DOCUMENT_UPDATE_LOOKUP = 'updateLookup'

FULL_DOCUMENT_WHEN_AVAILABLE

public mixed FULL_DOCUMENT_WHEN_AVAILABLE = 'whenAvailable'

Properties

$aggregateOptions

private array<string|int, mixed> $aggregateOptions

$changeStreamOptions

private array<string|int, mixed> $changeStreamOptions

$collectionName

private string|null $collectionName

$databaseName

private string $databaseName

$firstBatchSize

private int $firstBatchSize = 0

$hasResumed

private bool $hasResumed = false

$manager

private Manager $manager

$operationTime

private TimestampInterface|null $operationTime = null

$pipeline

private array<string|int, mixed> $pipeline

$postBatchResumeToken

private object|null $postBatchResumeToken = null

Methods

__construct()

Constructs an aggregate command for creating a change stream.

public __construct(Manager $manager, string|null $databaseName, string|null $collectionName, array<string|int, mixed> $pipeline[, array<string|int, mixed> $options = [] ]) : mixed

Supported options:

  • batchSize (integer): The number of documents to return per batch.

  • codec (MongoDB\Codec\DocumentCodec): Codec used to decode documents from BSON to PHP objects.

  • collation (document): Specifies a collation.

  • comment (mixed): BSON value to attach as a comment to this command.

    Only string values are supported for server versions < 4.4.

  • fullDocument (string): Determines how the "fullDocument" response field will be populated for update operations.

    By default, change streams only return the delta of fields (via an "updateDescription" field) for update operations and "fullDocument" is omitted. Insert and replace operations always include the "fullDocument" field. Delete operations omit the field as the document no longer exists.

    Specify "updateLookup" to return the current majority-committed version of the updated document.

    MongoDB 6.0+ allows returning the post-image of the modified document if the collection has changeStreamPreAndPostImages enabled. Specify "whenAvailable" to return the post-image if available or a null value if not. Specify "required" to return the post-image if available or raise an error if not.

  • fullDocumentBeforeChange (string): Determines how the "fullDocumentBeforeChange" response field will be populated. By default, the field is omitted.

    MongoDB 6.0+ allows returning the pre-image of the modified document if the collection has changeStreamPreAndPostImages enabled. Specify "whenAvailable" to return the pre-image if available or a null value if not. Specify "required" to return the pre-image if available or raise an error if not.

  • maxAwaitTimeMS (integer): The maximum amount of time for the server to wait on new documents to satisfy a change stream query.

  • readConcern (MongoDB\Driver\ReadConcern): Read concern.

  • readPreference (MongoDB\Driver\ReadPreference): Read preference. This will be used to select a new server when resuming. Defaults to a "primary" read preference.

  • resumeAfter (document): Specifies the logical starting point for the new change stream.

    Using this option in conjunction with "startAfter" and/or "startAtOperationTime" will result in a server error. The options are mutually exclusive.

  • session (MongoDB\Driver\Session): Client session.

  • showExpandedEvents (boolean): Enables the server to send the expanded list of change stream events.

    This option is not supported for server versions < 6.0.

  • startAfter (document): Specifies the logical starting point for the new change stream. Unlike "resumeAfter", this option can be used with a resume token from an "invalidate" event.

    Using this option in conjunction with "resumeAfter" and/or "startAtOperationTime" will result in a server error. The options are mutually exclusive.

  • startAtOperationTime (MongoDB\BSON\TimestampInterface): If specified, the change stream will only provide changes that occurred at or after the specified timestamp. Any command run against the server will return an operation time that can be used here. Alternatively, an operation time may be obtained from MongoDB\Driver\Server::getInfo().

    Using this option in conjunction with "resumeAfter" and/or "startAfter" will result in a server error. The options are mutually exclusive.

    This option is not supported for server versions < 4.0.

  • typeMap (array): Type map for BSON deserialization. This will be applied to the returned Cursor (it is not sent to the server).

Note: A database-level change stream may be created by specifying null for the collection name. A cluster-level change stream may be created by specifying null for both the database and collection name.

Parameters
$manager : Manager

Manager instance from the driver

$databaseName : string|null

Database name

$collectionName : string|null

Collection name

$pipeline : array<string|int, mixed>

Aggregation pipeline

$options : array<string|int, mixed> = []

Command options

Tags
throws
InvalidArgumentException

for parameter/option parsing errors

execute()

Execute the operation.

public execute(Server $server) : ChangeStream
Parameters
$server : Server
Tags
throws
UnsupportedException

if collation or read concern is used and unsupported

throws
RuntimeException

for other driver errors (e.g. connection errors)

Return values
ChangeStream

createAggregate()

Create the aggregate command for a change stream.

private createAggregate() : Aggregate

This method is also used to recreate the aggregate command when resuming.

Return values
Aggregate

createChangeStreamIterator()

Create a ChangeStreamIterator by executing the aggregate command.

private createChangeStreamIterator(Server $server) : ChangeStreamIterator
Parameters
$server : Server
Return values
ChangeStreamIterator

executeAggregate()

Execute the aggregate command.

private executeAggregate(Server $server) : CursorInterface

The command will be executed using APM so that we can capture data from its response (e.g. firstBatch size, postBatchResumeToken).

Parameters
$server : Server
Return values
CursorInterface

        
On this page

Search results