www.espertech.comDocumentation

Appendix G. Event Representation: Avro Events (org.apache.avro.generic.GenericData.Record)

This section provides information for using Avro to represent events.

An event can be represented by an Avro GenericData.Record instance. Event properties of Avro events are the field values of a GenericData.Record. The top level schema must always be a record.

The advantages for supporting Avro as an event representation are:

Similar to the Map and object-array event type, the Avro event type takes part in the comprehensive type system that can eliminate the need to use Java classes as event types, thereby making it easier to change types at runtime or generate or import/export type information from/to another source/destination.

The engine can process Avro's GenericData.Record events via the sendEventAvro(Object avroGenericDataDotRecord, String avroEventTypeName) method on the EPRuntime interface.

The engine does not validate Avro events. Your application must ensure that Avro values match the declaration of the schema and that the schema of the event matches the schema declared for the event type of the event.

A given Avro event type can have only a single supertype that must also be an Avro event type. All properties available on the Avro supertype is also available on the type itself. In addition, anywhere within EPL that an event type name of an Avro supertype is used, the Avro subtype and the subtype of the subtype match that expression. Note that access to properties is by field position thereby subtype and supertype field positions should be congruent.

In order to use Avro for incoming events, the event type name and Avro schema must be made known to the engine via configuration or create avro schema EPL syntax. Please see examples in Section 5.15, “Declaring an Event Type: Create Schema” and Section 15.4.4, “Events Represented by Avro GenericData.Record”.

The code snippet below defines an Avro event type, creates an Avro event and sends the event into the engine. The sample defines the CarLocUpdateEvent event type via the runtime configuration interface (create schema or static configuration could have been used instead).

// Define CarLocUpdateEvent event type (example for runtime-configuration interface)
Schema schema = record("CarLocUpdateEvent").fields()
  .name("carId").type().stringBuilder().prop(PROP_JAVA_STRING_KEY, PROP_JAVA_STRING_VALUE).endString().noDefault()
  .requiredInt("direction")
  .endRecord();
ConfigurationEventTypeAvro avroEvent = new ConfigurationEventTypeAvro(schema);
epService.getEPAdministrator().getConfiguration().addEventTypeAvro("CarLocUpdateEvent", avroEvent);

The CarLocUpdateEvent can now be used in a statement:

select count(*) from CarLocUpdateEvent(direction = 1)#time(1 min)

The sample code to send an event is:

GenericData.Record event = new GenericData.Record(schema);
event.put("carId", "A123456");
event.put("direction", 1);
epService.getEPRuntime().sendEventAvro(event, "CarLocUpdateEvent");

Use the @EventRepresentation(avro) annotation to obtain Avro output events.

Avro schemas can contain further Avro schemas. The engine tracks nested schema based on the schema name. The engine implicitly registers an event type for each schema using the schema name, for nested simple and indexed properties. Therefore the engine requires schema names to match the initially-registered schema of the same name.

For example, the schema:

{
  "type" : "record",
  "name" : "MyEvent",
  "fields" : [ {
    "name" : "nested",
    "type" : {
      "type" : "record",
      "name" : "MyNestedEvent",
      "fields" : [ {
        "name" : "value",
        "type" : "int"
      } ]
    }
  } ]
}

For the above schema, upon registration of the schema as an event type, the engine creates an event type MyNestedEvent and associates it to the MyNestedEvent schema.

Upon registering an Avro event type, the engine determines property names and property types. The Avro record field schema determines the property type.

The table below describes Avro field schema to property type mapping:


For unions:

  1. If the union contains null and any of the primitive types, the property type is the boxed type. For example unionOf().nullType().and().intType().endUnion() is Integer.class.

  2. If the union contains null and numeric types only, the property type is Number.class. For example unionOf().longType().and().intType().endUnion() is Number.class.

  3. Otherwise the property type is Object.class.

This section lists for each JVM type the default Avro schema that the engine uses when assembling an Avro schema from a select-clause.

For example, consider the following EPL statement. The statement assumes that MyEvent is a pre-registered event type of any kind (Map, Avro, Object-Array, POJO etc.):

@EventRepresentation(avro) select 1 as carId, 'abc' as carType from MyEvent

Your application may obtain the schema for the statement output event type as follows:

String epl = "@EventRepresentation(avro) select 1 as carId, 'abc' as carType from MyEvent";
EPStatement stmt = epService.getEPAdministrator().createEPL(epl);
Schema schema = (Schema) ((AvroSchemaEventType) stmt.getEventType()).getSchema();

The engine generates an Avro schema based on the expressions in the select-clause. The schema in pretty-print may look like this:

{
  "type" : "record",
  "name" : "anonymous_1_result_",
  "fields" : [ {
    "name" : "carId",
    "type" : "int"
  }, {
    "name" : "carType",
    "type" : {
      "type" : "string",
      "avro.java.string" : "String"
    }
  } ]
}

Please consult Section 15.4.14.2, “Avro Settings” for details on controlling default mapping. Tables below outline the default mapping and provide alternative schemas depending on the avro settings .

By default the engine maps expression result types to Avro schema using non-null schema types. By default, for String-type values, the engine sets the avro.java.string property to String to ensure that Avro uses java.lang.String to represent strings (and not org.apache.avro.util.Utf8). The tables below outline the default mapping and provide alternative schemas, which apply according to Esper Avro settings.

The mapping from primitive and string type to Avro schema is:


The mapping from array-type to Avro schema is:


Additional mappings to Avro schema are:


Esper provides the @AvroSchemaField annotation to assign a schema to a given property. The annotation requires the name attribute for the property name and the schema attributed for the Avro schema text.

The schema provided via @AvroSchemaField for a given property replaces the Avro schema that is otherwise assigned according to the above mapping.

The annotation can be used with create-schema or with @EventRepresentation(avro).

In this example the carId property is a union of int-type and string-type.

@AvroSchemaField(name='carId',schema='["int","string"]') create avro schema MyEvent(carId object)

The engine determines the property type from the Avro field schema according to the rules listed above.

In the default configuration only the primitive data types and the abovementioned classes have a corresponding Avro schema. When the engine encounters a class for which is does not know the Avro schema that is should use, it fails the EPL statement validation.

For example, the below EPL is invalid:

// Invalid since LocalDateTime has no equivalent Avro schema (by default)
create avro schema MyEvent(ldt as java.time.LocalDateTime)

Instead of using @AvroSchemaField your application can configure a type-representation mapper class that can return the Avro schema to use. For configuration information please see Section 15.4.14.2, “Avro Settings” and the JavaDoc.

Your application can implement the com.espertech.esper.client.hook.TypeRepresentationMapper interface. The engine invokes the provided mapper to determine the Avro schema for a given field.

For example, the following type mapper implementation maps LocalDateTime to the Avro string type.

public class MyTypeRepresentationMapper implements TypeRepresentationMapper {
  public Object map(TypeRepresentationMapperContext context) {
    if (context.getClazz() == LocalDateTime.class) {
      return builder().stringBuilder().endString();
    }
    return null;
  }
}

The engine can automatically widen and assign values to Avro fields. In the case when your application requires a custom logic to convert, widen, coerce or transform a value before assigment to an Avro field, please use the mechanism below.

Your application can implement the com.espertech.esper.client.hook.ObjectValueTypeWidenerFactory interface. The engine invokes the provided factory to determine a widener for values.

For example, the factory implementation below returns a type widener that converts LocalDateTime instances to Avro string-type values by using the date-time formatter:

public static class MyObjectValueTypeWidenerFactory implements ObjectValueTypeWidenerFactory {

  public TypeWidener make(ObjectValueTypeWidenerFactoryContext context) {
    if (context.getClazz() == LocalDateTime.class) {
      return new TypeWidener() {
        public Object widen(Object input) {
          LocalDateTime ldt = (LocalDateTime) input;
          return DateTimeFormatter.ISO_DATE_TIME.format(ldt);
        }
      };
    }
    return null;
  }
}

To obtain the Avro schema for a given event type, use:

public static Schema getAvroSchema(EventType eventType) {
  return (Schema) ((AvroSchemaEventType) eventType).getSchema();
}

To obtain the Avro schema for a registered event type, you may use:

public static Schema getAvroSchema(EPServiceProvider epService, String eventTypeName) {
  return getAvroSchema(epService.getEPAdministrator().getConfiguration().getEventType(eventTypeName));
}

To obtain the Avro schema for a given event, you may use:

public static Schema getAvroSchema(EventBean event) {
  return getAvroSchema(event.getEventType());
}

To obtain the GenericData.Record for a given event, you may use:

public static Schema getAvroRecord(EventBean event) {
  return (GenericData.Record) event.getUnderlying();
}

The following limitations apply: