Aggregate
Run aggregation pipeline on a MongoDB collection.
type: "io.kestra.plugin.mongodb.Aggregate"Examples
Simple aggregation pipeline to group and sum data
id: mongodb_aggregate
namespace: company.team
tasks:
  - id: aggregate
    type: io.kestra.plugin.mongodb.Aggregate
    connection:
      uri: "mongodb://root:example@localhost:27017/?authSource=admin"
    database: "my_database"
    collection: "sales"
    pipeline:
      - $match:
          status: "active"
      - $group:
          _id: "$category"
          total:
            $sum: "$amount"
          count:
            $sum: 1
      - $sort:
          total: -1
Complex aggregation with lookup and data transformation
id: mongodb_complex_aggregate
namespace: company.team
tasks:
  - id: aggregate_with_lookup
    type: io.kestra.plugin.mongodb.Aggregate
    connection:
      uri: "mongodb://root:example@localhost:27017/?authSource=admin"
    database: "my_database"
    collection: "users"
    pipeline:
      - $lookup:
          from: "orders"
          localField: "_id"
          foreignField: "userId"
          as: "userOrders"
      - $addFields:
          totalOrders:
            $size: "$userOrders"
          totalSpent:
            $sum: "$userOrders.amount"
      - $project:
          name: 1
          email: 1
          totalOrders: 1
          totalSpent: 1
      - $match:
          totalOrders:
            $gt: 0
    allowDiskUse: true
    maxTimeMs: 30000
Properties
collection *Requiredstring
MongoDB collection.
connection *RequiredNon-dynamicMongoDbConnection
MongoDB connection properties.
database *Requiredstring
MongoDB database.
pipeline *Requiredarray
MongoDB aggregation pipeline
List of pipeline stages as a BSON array or list of maps
allowDiskUse booleanstring
trueWhether to allow disk usage for stages
Enables writing to temporary files when a pipeline stage exceeds the 100 megabyte limit
batchSize integerstring
1000Batch size for cursor
Sets the number of documents to return per batch
maxTimeMs integerstring
60000Maximum execution time in milliseconds
Sets the maximum execution time on the server for this operation
store string
FETCHSTOREFETCHFETCH_ONENONEWhether to store the data from the aggregation result into an Ion-serialized data file
Outputs
rows array
List containing the aggregation results
Only populated if store parameter is set to false
size integer
The number of documents returned by the aggregation
uri string
uriURI of the file containing the aggregation results
Only populated if store parameter is set to true
Metrics
records counter
countNumber of documents returned by the aggregation pipeline
Definitions
io.kestra.plugin.mongodb.MongoDbConnection
uri *Requiredstring
Connection string to MongoDB server
URL format like mongodb://mongodb0.example.com: 27017