DynamoDB
Process DynamoDB Streams (table change-data-capture) in a Lambda triggered by a stream event source mapping.
AWS documentation
- Using AWS Lambda with Amazon DynamoDB
- Change data capture with DynamoDB Streams
- DynamoDB Streams Lambda Integration error handling
What AWS sends
event.Records is a batch of change records. Each record has eventName (INSERT, MODIFY, REMOVE), eventSource: 'aws:dynamodb', dynamodb.Keys, and depending on StreamViewType, dynamodb.NewImage and/or dynamodb.OldImage in DynamoDB's typed attribute format ({ id: { S: "abc" } }).
DynamoDB Streams use the same partial-batch checkpoint model as Kinesis: Lambda checkpoints to the lowest failed sequence number and replays from there. Use FunctionResponseTypes: [ReportBatchItemFailures] to report partial failures.
Recommended middlewares
| Middleware | Why |
|---|---|
@middy/event-normalizer | Unmarshal NewImage / OldImage from typed format to plain JS |
@middy/event-batch-handler | Per-record handler |
@middy/event-batch-response | Report batchItemFailures |
Example
import middy from '@middy/core'
import eventNormalizer from '@middy/event-normalizer'
import eventBatchResponse from '@middy/event-batch-response'
import eventBatchHandler from '@middy/event-batch-handler'
const recordHandler = async (record, context) => {
if (record.eventName === 'REMOVE') return // ignore deletes
// record.dynamodb.NewImage is now plain JS (event-normalizer unmarshalled it)
await indexItem(record.dynamodb.NewImage)
}
const lambdaHandler = eventBatchHandler(recordHandler)
export const handler = middy()
.use(eventNormalizer())
.use(eventBatchResponse())
.handler(lambdaHandler) With Durable Functions
DynamoDB Streams use the same partial-batch checkpoint model as Kinesis. Wrapping the handler in withDurableExecution lets event-batch-handler auto-checkpoint each record so prior writes (e.g. to a search index, cache, or downstream API) do not repeat on replay.
import { withDurableExecution } from '@aws/durable-execution-sdk-js'
import middy from '@middy/core'
import eventNormalizer from '@middy/event-normalizer'
import eventBatchResponse from '@middy/event-batch-response'
import eventBatchHandler from '@middy/event-batch-handler'
const recordHandler = async (record, ctx) => {
const change = record.dynamodb
await ctx.step('index', async () => searchIndex.upsert(change.NewImage))
await ctx.step('audit', async () => auditLog.write(change))
}
const lambdaHandler = eventBatchHandler(recordHandler)
export const handler = withDurableExecution(
middy()
.use(eventNormalizer())
.use(eventBatchResponse())
.handler(lambdaHandler)
) IaC: required event source mapping
See the DynamoDB Streams recipe for CloudFormation/SAM/CDK snippets.
Common gotchas
OldImageonly present with the rightStreamViewType. SetNEW_AND_OLD_IMAGES(orOLD_IMAGE) on the table stream if you need it.REMOVErecords have noNewImage. Handle deletes explicitly.- Whole-batch replay. Without
ReportBatchItemFailures, any error replays the whole batch and everything after it - quickly catastrophic. - Hot shards. A single partition key writing rapidly can throttle the consumer. Increase
ParallelizationFactoron the event source mapping.
Related
- DynamoDB Streams recipe
@middy/event-normalizer@middy/dynamodb- fetch config from DynamoDB tables- Kinesis Streams
Last updated: