event-batch-parser
A unified body-parser middleware for Lambda batch event sources. Walks the records of any supported source, base64-decodes, optionally strips AWS Glue Schema Registry framing (and decompresses), then runs a parser of your choice.
Supported sources:
- Kafka — Amazon MSK (
aws:kafka) and self-managed (SelfManagedKafka) — per-field config:keyand/orvalue - Kinesis Data Streams (
aws:kinesis) —datamapped torecord.kinesis.data - Kinesis Firehose (
aws:lambda:events) —datamapped torecord.data - SQS (
aws:sqs) —bodymapped torecord.body - ActiveMQ (
aws:amq) —datamapped tomessage.data - RabbitMQ (
aws:rmq) —datamapped tomessage.data
Each non-Kafka source supports exactly one of body or data — whichever matches the underlying record field. Using the wrong one throws a TypeError at startup.
Install
npm install --save @middy/event-batch-parser
# Pick the format(s) you need
npm install --save avro-js
npm install --save protobufjs
# Optional: dynamic schemas via AWS Glue Schema Registry
npm install --save @middy/glue-schema-registry
npm install --save-dev @aws-sdk/client-glue Options
key(function) (Kafka only): Parser to apply to each record’skey. Use one ofparseJson(),parseAvro({...}),parseProtobuf({...}).value(function) (Kafka only): Parser to apply to each record’svalue.body(function) (SQS only): Parser to apply torecord.body.data(function) (Kinesis / Firehose / MQ): Parser to apply to the source-specific data field (record.kinesis.data,record.data, ormessage.data).disableEventSourceError(boolean) (defaultfalse): Iftrue, unknown event sources are skipped silently instead of throwing.maxDecompressedBytes(integer) (default10485760— 10 MiB): Cap on the decompressed size of any single Glue-framed (0x05zlib) record payload. Bounds zlib output to defend against compression-bomb DoS from external producers. A breach throws an HTTP 413 error.
Parser exports
parseJson({ reviver? })
Parses each record body as JSON. Equivalent to JSON.parse(buffer.toString('utf-8'), reviver).
parseAvro({ schema?, internalKey? })
Decodes Avro-encoded payloads using avro-js.
schema: a static Avro schema (string or object).internalKey: name of arequest.internalentry populated by@middy/glue-schema-registry’sfetchData. The entry’sschemaDefinitionis used.
parseProtobuf({ root?, messageType?, internalKey? })
Decodes Protobuf-encoded payloads using protobufjs.
rootandmessageType: a loadedprotobuf.Rootand the fully-qualified type name. Static path.internalKey: name of arequest.internalentry containing{ root, messageType }.
Sample usage
Kafka with static Avro schema
import middy from '@middy/core'
import eventBatchParser from '@middy/event-batch-parser'
import parseAvro from '@middy/event-batch-parser/parseAvro'
const userSchema = { type: 'record', name: 'User', fields: [
{ name: 'id', type: 'string' },
{ name: 'name', type: 'string' },
] }
export const handler = middy()
.use(eventBatchParser({ value: parseAvro({ schema: userSchema }) }))
.handler(async (event) => {
for (const records of Object.values(event.records)) {
for (const record of records) {
// record.value is now { id, name }
}
}
}) Kinesis with Glue Schema Registry (schema fetched at startup, exposed on internal)
import middy from '@middy/core'
import glueSchemaRegistry from '@middy/glue-schema-registry'
import eventBatchParser from '@middy/event-batch-parser'
import parseAvro from '@middy/event-batch-parser/parseAvro'
export const handler = middy()
.use(glueSchemaRegistry({
fetchData: { userSchema: { SchemaVersionId: '...' } },
}))
.use(eventBatchParser({
data: parseAvro({ internalKey: 'userSchema' }),
}))
.handler(async (event) => {
for (const record of event.Records) {
// record.kinesis.data is now the decoded JS object
}
}) SQS with JSON
import middy from '@middy/core'
import eventBatchParser from '@middy/event-batch-parser'
import parseJson from '@middy/event-batch-parser/parseJson'
export const handler = middy()
.use(eventBatchParser({ body: parseJson() }))
.handler(async (event) => {
for (const record of event.Records) {
// record.body is now the parsed JSON value
}
}) Glue framing
When a record’s base64-decoded buffer starts with byte 0x03, the middleware treats it as AWS Glue Schema Registry framing:
byte 0 : header version (0x03)
byte 1 : compression (0x00 raw, 0x05 zlib)
bytes 2-17 : SchemaVersionId UUID
bytes 18+ : payload (Avro/Protobuf/JSON-Schema-encoded) The middleware sets record._schemaVersionId (canonical UUID with dashes) and record._payload (decompressed bytes after the prefix). Parsers read these properties when present and fall back to the full buffer otherwise.