Kafka, Self-Managed
AWS Documentation
Example JSON
import middy from '@middy/core'
import eventBatchParser, { parseJson } from '@middy/event-batch-parser'
import eventBatchResponse from '@middy/event-batch-response'
import eventBatchHandler from '@middy/event-batch-handler'
const recordHandler = async (message, context) => {
// message.value is the parsed JSON payload; throw to mark it as failed
}
const lambdaHandler = eventBatchHandler(recordHandler)
export const handler = middy()
.use(eventBatchParser({ value: parseJson() }))
.use(eventBatchResponse())
.handler(lambdaHandler) Example Avro
import middy from '@middy/core'
import eventBatchParser, { parseAvro } from '@middy/event-batch-parser'
import eventBatchResponse from '@middy/event-batch-response'
import eventBatchHandler from '@middy/event-batch-handler'
const schema = { type: 'record', name: 'Message', fields: [
{ name: 'id', type: 'string' },
{ name: 'payload', type: 'string' },
] }
const recordHandler = async (message, context) => {
// message.value is the decoded Avro object
}
const lambdaHandler = eventBatchHandler(recordHandler)
export const handler = middy()
.use(eventBatchParser({ value: parseAvro({ schema }) }))
.use(eventBatchResponse())
.handler(lambdaHandler) For dynamic schemas resolved via @middy/glue-schema-registry, omit schema and chain the registry middleware.
Example Protobuf
Per-record schemas are resolved dynamically from the AWS Glue Schema Registry. Each Glue-framed record carries a SchemaVersionId that the registry middleware fetches (and caches) before parseProtobuf runs.
import middy from '@middy/core'
import glueSchemaRegistry from '@middy/glue-schema-registry'
import eventBatchParser, { parseProtobuf } from '@middy/event-batch-parser'
import eventBatchResponse from '@middy/event-batch-response'
import eventBatchHandler from '@middy/event-batch-handler'
const recordHandler = async (message, context) => {
// message.value is the decoded Protobuf message (as JSON)
}
const lambdaHandler = eventBatchHandler(recordHandler)
export const handler = middy()
.use(glueSchemaRegistry())
.use(eventBatchParser({ value: parseProtobuf(), glueSchemaRegistry: {} }))
.use(eventBatchResponse())
.handler(lambdaHandler) With Durable Functions
Same partial-batch + per-partition offset behavior as MSK. Wrapping in withDurableExecution checkpoints each message so prior work isn’t re-run when offsets within a partition are retried out of order.
import { withDurableExecution } from '@aws/durable-execution-sdk-js'
import middy from '@middy/core'
import eventBatchParser, { parseJson } from '@middy/event-batch-parser'
import eventBatchResponse from '@middy/event-batch-response'
import eventBatchHandler from '@middy/event-batch-handler'
const recordHandler = async (message, ctx) => {
await ctx.step('process', async () => process(message.value))
}
const lambdaHandler = eventBatchHandler(recordHandler)
export const handler = withDurableExecution(
middy()
.use(eventBatchParser({ value: parseJson() }))
.use(eventBatchResponse())
.handler(lambdaHandler)
) Last updated: