www.espertech.comDocumentation
This section provides information for using Avro to represent events.
An event can be represented by an Avro GenericData.Record
instance. Event properties of Avro events are the field values of a GenericData.Record
. The top level schema must always be a record.
The advantages for supporting Avro as an event representation are:
Avro has excellent support for JSON, allowing JSON for incoming and outgoing events, while not compromising on type-safety since Avro provides a schema.
Avro has rich, extensible, standardized schema language defined in pure JSON; event types / schemas can be defined/imported/exported with EPL or from external sources.
Avro offers a compact binary representation and is thus efficient and fast for use with EsperHA persistence or for input/output in wire transfer.
Avro has a compact event representation reducing memory use, as each event is only a schema-reference and an Object[] (see GenericData.Record).
JSON itself is not memory efficient while Avro is: JSON repeats every field name with every single record and JSON alone is inefficient for high-volume usage.
Avro allows fast access to event properties since reading an event property value only requires reading the GenericData.Record-internal object-array at a given index.
Avro has bindings for a wide variety of programming languages and platforms and has RPC and file representations.
Avro does not require code generation so EPL can be written generically for any data stream. Type information can be made available at runtime while still providing type-safety. There is no need to generate code, therefore there is no need to manage generated classes, or to reload classes or to restart the process to reload classes.
Avro has the notion of schema compatibility for evolving your event data over time.
Similar to the Map and object-array event type, the Avro event type takes part in the comprehensive type system that can eliminate the need to use Java classes as event types, thereby making it easier to change types at runtime or generate or import/export type information from/to another source/destination.
The engine can process Avro's GenericData.Record
events via the sendEventAvro(Object avroGenericDataDotRecord, String avroEventTypeName)
method on the EPRuntime
interface.
The engine does not validate Avro events. Your application must ensure that Avro values match the declaration of the schema and that the schema of the event matches the schema declared for the event type of the event.
A given Avro event type can have only a single supertype that must also be an Avro event type. All properties available on the Avro supertype is also available on the type itself. In addition, anywhere within EPL that an event type name of an Avro supertype is used, the Avro subtype and the subtype of the subtype match that expression. Note that access to properties is by field position thereby subtype and supertype field positions should be congruent.
In order to use Avro for incoming events, the event type name and Avro schema must be made known to the engine via configuration or create avro schema
EPL syntax. Please see examples in Section 5.15, “Declaring an Event Type: Create Schema” and Section 15.4.4, “Events Represented by Avro GenericData.Record”.
The code snippet below defines an Avro event type, creates an Avro event and sends the event into the engine. The sample defines the CarLocUpdateEvent
event type via the runtime configuration interface (create schema
or static configuration could have been used instead).
// Define CarLocUpdateEvent event type (example for runtime-configuration interface) Schema schema = record("CarLocUpdateEvent").fields() .name("carId").type().stringBuilder().prop(PROP_JAVA_STRING_KEY, PROP_JAVA_STRING_VALUE).endString().noDefault() .requiredInt("direction") .endRecord(); ConfigurationEventTypeAvro avroEvent = new ConfigurationEventTypeAvro(schema); epService.getEPAdministrator().getConfiguration().addEventTypeAvro("CarLocUpdateEvent", avroEvent);
The CarLocUpdateEvent
can now be used in a statement:
select count(*) from CarLocUpdateEvent(direction = 1)#time(1 min)
The sample code to send an event is:
GenericData.Record event = new GenericData.Record(schema); event.put("carId", "A123456"); event.put("direction", 1); epService.getEPRuntime().sendEventAvro(event, "CarLocUpdateEvent");
Use the @EventRepresentation(avro)
annotation to obtain Avro output events.
Avro schemas can contain further Avro schemas. The engine tracks nested schema based on the schema name. The engine implicitly registers an event type for each schema using the schema name, for nested simple and indexed properties. Therefore the engine requires schema names to match the initially-registered schema of the same name.
For example, the schema:
{ "type" : "record", "name" : "MyEvent", "fields" : [ { "name" : "nested", "type" : { "type" : "record", "name" : "MyNestedEvent", "fields" : [ { "name" : "value", "type" : "int" } ] } } ] }
For the above schema, upon registration of the schema as an event type, the engine creates an event type MyNestedEvent
and associates it to the MyNestedEvent
schema.
Upon registering an Avro event type, the engine determines property names and property types. The Avro record field schema determines the property type.
The table below describes Avro field schema to property type mapping:
Table G.1. Avro Field Schema to Property Type Mapping
Schema | Property Type |
---|---|
"int" (Schema.Type.INT) | int |
"long" (Schema.Type.LONG) | long |
"double" (Schema.Type.DOUBLE) | double |
"float" (Schema.Type.FLOAT) | float |
"boolean" (Schema.Type.BOOLEAN) | boolean |
"bytes" (Schema.Type.BYTES) | ByteBuffer |
"null" (Schema.Type.NULL) | null |
"string" (Schema.Type.STRING) |
If the field has the property |
"union" (Schema.Type.UNION) | See below. |
"array" (Schema.Type.ARRAY) | java.util.Collection |
"map" (Schema.Type.MAP) | java.util.Map |
"record" (Schema.Type.RECORD) | GenericData.Record |
"fixed" (Schema.Type.FIXED) | GenericFixed |
"enum" (Schema.Type.ENUM) | GenericEnumSymbol |
For unions:
If the union contains null
and any of the primitive types, the property type is the boxed type. For example unionOf().nullType().and().intType().endUnion()
is Integer.class
.
If the union contains null
and numeric types only, the property type is Number.class
. For example unionOf().longType().and().intType().endUnion()
is Number.class
.
Otherwise the property type is Object.class
.
This section lists for each JVM type the default Avro schema that the engine uses when assembling an Avro schema from a select
-clause.
For example, consider the following EPL statement. The statement assumes that MyEvent
is a pre-registered event type of any kind (Map, Avro, Object-Array, POJO etc.):
@EventRepresentation(avro) select 1 as carId, 'abc' as carType from MyEvent
Your application may obtain the schema for the statement output event type as follows:
String epl = "@EventRepresentation(avro) select 1 as carId, 'abc' as carType from MyEvent"; EPStatement stmt = epService.getEPAdministrator().createEPL(epl); Schema schema = (Schema) ((AvroSchemaEventType) stmt.getEventType()).getSchema();
The engine generates an Avro schema based on the expressions in the select
-clause. The schema in pretty-print may look like this:
{ "type" : "record", "name" : "anonymous_1_result_", "fields" : [ { "name" : "carId", "type" : "int" }, { "name" : "carType", "type" : { "type" : "string", "avro.java.string" : "String" } } ] }
Please consult Section 15.4.14.2, “Avro Settings” for details on controlling default mapping. Tables below outline the default mapping and provide alternative schemas depending on the avro settings .
By default the engine maps expression result types to Avro schema using non-null schema types.
By default, for String-type values, the engine sets the avro.java.string
property to String
to ensure that Avro uses java.lang.String
to represent strings (and not org.apache.avro.util.Utf8
).
The tables below outline the default mapping and provide alternative schemas, which apply according to Esper Avro settings.
The mapping from primitive and string type to Avro schema is:
Table G.2. Primitive Data Type and String Mapping
Type | Default Schema | Alternative Schemas |
---|---|---|
byte | "int" | N/A |
java.lang.Byte | "int" | ["null","int"] |
boolean | "boolean" | N/A |
java.lang.Boolean | "boolean" | ["null","boolean"] |
double | "double" | N/A |
java.lang.Double | "double" | ["null","double"] |
float | "float" | N/A |
java.lang.Float | "float" | ["null","float"] |
int | "int" | N/A |
java.lang.Integer | "int" | ["null","int"] |
long | "long" | N/A |
java.lang.Long | "long" | ["null","long"] |
null | "null" | N/A |
java.lang.String and java.lang.CharSequence | {"type":"string","avro.java.string":"String"} | "string"or ["null","string"]or ["null",{"type":"string","avro.java.string":"String"}] |
The mapping from array-type to Avro schema is:
Table G.3. Array Type Mapping
Type | Default Schema | Alternative Schemas |
---|---|---|
byte[] | "bytes" | ["null","bytes"] |
Byte[] | {"type":"array","items":["null","int"]} | ["null",{"type":"array","items":["null","int"]}] |
boolean[] | {"type":"array","items":"boolean"} | ["null",{"type":"array","items":"boolean"}] |
Boolean[] | {"type":"array","items":["null","boolean"]} | ["null",{"type":"array","items":["null","boolean"]}] |
double[] | {"type":"array","items":"double"} | ["null",{"type":"array","items":"double"}] |
Double[] | {"type":"array","items":["null","double"]} | ["null",{"type":"array","items":["null","double"]}] |
float[] | {"type":"array","items":"float"} | ["null",{"type":"array","items":"float"}] |
Float[] | {"type":"array","items":["null","float"]} | ["null",{"type":"array","items":["null","float"]}] |
int[] | {"type":"array","items":"int"} | ["null",{"type":"array","items":"int"}] |
Integer[] | {"type":"array","items":["null","int"]} | ["null",{"type":"array","items":["null","int"]}] |
long[] | {"type":"array","items":"long"} | ["null",{"type":"array","items":"long"}] |
Long[] | {"type":"array","items":["null","long"]} | ["null",{"type":"array","items":["null","long"]}] |
java.lang.String[] and java.lang.CharSequence[] | {"type":"array","items":{"type":"string","avro.java.string":"String"}} | ["null",{"type":"array","items":{"type":"string","avro.java.string":"String"}}]or {"type":"array","items":"string"}(or the combination) |
Additional mappings to Avro schema are:
Table G.4. Additional Mapping
Type | Default Schema | Alternative Schemas |
---|---|---|
java.util.Map interface implementation | {"type":"map","values":{"type":"string","avro.java.string":"String"}} | ["null",{"type":"map","values":{"type":"string","avro.java.string":"String"}}] |
Esper provides the @AvroSchemaField
annotation to assign a schema to a given property. The annotation requires the name
attribute for the property name and the schema
attributed for the Avro schema text.
The schema provided via @AvroSchemaField
for a given property replaces the Avro schema that is otherwise assigned according to the above mapping.
The annotation can be used with create-schema
or with @EventRepresentation(avro)
.
In this example the carId
property is a union of int-type and string-type.
@AvroSchemaField(name='carId',schema='["int","string"]') create avro schema MyEvent(carId object)
The engine determines the property type from the Avro field schema according to the rules listed above.
In the default configuration only the primitive data types and the abovementioned classes have a corresponding Avro schema. When the engine encounters a class for which is does not know the Avro schema that is should use, it fails the EPL statement validation.
For example, the below EPL is invalid:
// Invalid since LocalDateTime has no equivalent Avro schema (by default) create avro schema MyEvent(ldt as java.time.LocalDateTime)
Instead of using @AvroSchemaField
your application can configure a type-representation mapper class that can return the Avro schema to use.
For configuration information please see Section 15.4.14.2, “Avro Settings” and the JavaDoc.
Your application can implement the com.espertech.esper.client.hook.TypeRepresentationMapper
interface. The engine invokes the provided mapper to determine the Avro schema for a given field.
For example, the following type mapper implementation maps LocalDateTime
to the Avro string type.
public class MyTypeRepresentationMapper implements TypeRepresentationMapper { public Object map(TypeRepresentationMapperContext context) { if (context.getClazz() == LocalDateTime.class) { return builder().stringBuilder().endString(); } return null; } }
The engine can automatically widen and assign values to Avro fields. In the case when your application requires a custom logic to convert, widen, coerce or transform a value before assigment to an Avro field, please use the mechanism below.
Your application can implement the com.espertech.esper.client.hook.ObjectValueTypeWidenerFactory
interface. The engine invokes the provided factory to determine a widener for values.
For example, the factory implementation below returns a type widener that converts LocalDateTime
instances to Avro string-type values by using the date-time formatter:
public static class MyObjectValueTypeWidenerFactory implements ObjectValueTypeWidenerFactory { public TypeWidener make(ObjectValueTypeWidenerFactoryContext context) { if (context.getClazz() == LocalDateTime.class) { return new TypeWidener() { public Object widen(Object input) { LocalDateTime ldt = (LocalDateTime) input; return DateTimeFormatter.ISO_DATE_TIME.format(ldt); } }; } return null; } }
To obtain the Avro schema for a given event type, use:
public static Schema getAvroSchema(EventType eventType) { return (Schema) ((AvroSchemaEventType) eventType).getSchema(); }
To obtain the Avro schema for a registered event type, you may use:
public static Schema getAvroSchema(EPServiceProvider epService, String eventTypeName) { return getAvroSchema(epService.getEPAdministrator().getConfiguration().getEventType(eventTypeName)); }
To obtain the Avro schema for a given event, you may use:
public static Schema getAvroSchema(EventBean event) { return getAvroSchema(event.getEventType()); }
To obtain the GenericData.Record
for a given event, you may use:
public static Schema getAvroRecord(EventBean event) { return (GenericData.Record) event.getUnderlying(); }
The following limitations apply:
An Avro GenericData.Record
cannot contain EventBean
instances.
There is no implicit translation from other event representations to Avro schemas.
While the engine performs best-effort assignment checking and widening, it does not actually itself verify that the GenericData.Record
contains valid data, for both production
of GenericData.Record
and consumption of GenericData.Record
.