www.espertech.comDocumentation

Appendix G. Event Representation: Avro Events (org.apache.avro.generic.GenericData.Record)

This section provides information for using Avro to represent events.

An event can be represented by an Avro GenericData.Record instance. Event properties of Avro events are the field values of a GenericData.Record. The top level schema must always be a record.

The advantages for supporting Avro as an event representation are:

Similar to the Map and object-array event type, the Avro event type takes part in the comprehensive type system that can eliminate the need to use Java classes as event types.

The runtime can process Avro's GenericData.Record events via the sendEventAvro(Object avroGenericDataDotRecord, String avroEventTypeName) method on the EPEventService interface.

The runtime does not validate Avro events. Your application must ensure that Avro values match the declaration of the schema and that the schema of the event matches the schema declared for the event type of the event.

A given Avro event type can have only a single supertype that must also be an Avro event type. All properties available on the Avro supertype is also available on the type itself. In addition, anywhere within EPL that an event type name of an Avro supertype is used, the Avro subtype and the subtype of the subtype match that expression. Note that access to properties is by field position thereby subtype and supertype field positions should be congruent.

In order to use Avro for incoming events, the event type name and Avro schema must be made known via configuration or create avro schema EPL syntax. Please see examples in Section 5.15, “Declaring an Event Type: Create Schema” and Section 17.4.6, “Events Represented by Avro GenericData.Record”.

The code snippet below defines an Avro event type, creates an Avro event and sends the event into the runtime. The sample defines the CarLocUpdateEvent event type via the configuration (create schema could have been used instead).

// Define CarLocUpdateEvent event type
Schema schema = record("CarLocUpdateEvent").fields()
  .name("carId").type().stringBuilder().prop(PROP_JAVA_STRING_KEY, PROP_JAVA_STRING_VALUE).endString().noDefault()
  .requiredInt("direction")
  .endRecord();
Configuration configuration = new Configuration();
ConfigurationCommonEventTypeAvro avroEvent = new ConfigurationCommonEventTypeAvro(schema);
configuration.getCommon().addEventTypeAvro("CarLocUpdateEvent", avroEvent);

The CarLocUpdateEvent can now be used in a statement:

select count(*) from CarLocUpdateEvent(direction = 1)#time(1 min)

The sample code to send an event is:

GenericData.Record event = new GenericData.Record(schema);
event.put("carId", "A123456");
event.put("direction", 1);
runtime.getEventService().sendEventAvro(event, "CarLocUpdateEvent");

Use the @EventRepresentation(avro) annotation to obtain Avro output events.

Avro schemas can contain further Avro schemas. The compiler and runtime track nested schema based on the schema name. The compiler and runtime implicitly register an event type for each schema using the schema name, for nested simple and indexed properties. Therefore the compiler and runtime require schema names to match the initially-registered schema of the same name.

For example, the schema:

{
  "type" : "record",
  "name" : "MyEvent",
  "fields" : [ {
    "name" : "nested",
    "type" : {
      "type" : "record",
      "name" : "MyNestedEvent",
      "fields" : [ {
        "name" : "value",
        "type" : "int"
      } ]
    }
  } ]
}

For the above schema, upon registration of the schema as an event type, the compiler or runtime creates an event type MyNestedEvent and associates it to the MyNestedEvent schema.

Upon registering an Avro event type, the compiler and runtime determine property names and property types. The Avro record field schema determines the property type.

The table below describes Avro field schema to property type mapping:


For unions:

  1. If the union contains null and any of the primitive types, the property type is the boxed type. For example unionOf().nullType().and().intType().endUnion() is Integer.class.

  2. If the union contains null and numeric types only, the property type is Number.class. For example unionOf().longType().and().intType().endUnion() is Number.class.

  3. Otherwise the property type is Object.class.

This section lists for each JVM type the default Avro schema that the compiler and runtime uses when assembling an Avro schema from a select-clause.

For example, consider the following statement. The statement assumes that MyEvent is a pre-registered event type of any kind (Map, Avro, Object-Array, POJO etc.):

@EventRepresentation(avro) select 1 as carId, 'abc' as carType from MyEvent

Your application may obtain the schema for the statement output event type as follows:

String epl = "@EventRepresentation(avro) select 1 as carId, 'abc' as carType from MyEvent";
Configuration configuration = new Configuration();
configuration.getCommon().addEventType(MyEvent.class);
CompilerArguments compilerArguments = new CompilerArguments(configuration);
EPCompiled compiled = EPCompilerProvider.getCompiler().compile(epl, compilerArguments);

EPDeployment deployment = EPRuntimeProvider.getDefaultRuntime().getDeploymentService().deploy(compiled).getStatements[0];
Schema schema = (Schema) ((AvroSchemaEventType) stmt.getEventType()).getSchema();

The compiler generates an Avro schema based on the expressions in the select-clause. The schema in pretty-print may look like this:

{
  "type" : "record",
  "name" : "anonymous_1_result_",
  "fields" : [ {
    "name" : "carId",
    "type" : "int"
  }, {
    "name" : "carType",
    "type" : {
      "type" : "string",
      "avro.java.string" : "String"
    }
  } ]
}

Please consult Section 17.4.8.2, “Avro Settings” for details on controlling default mapping. Tables below outline the default mapping and provide alternative schemas depending on the avro settings .

By default the compiler maps expression result types to Avro schema using non-null schema types. By default, for String-type values, the compiler sets the avro.java.string property to String to ensure that Avro uses java.lang.String to represent strings (and not org.apache.avro.util.Utf8). The tables below outline the default mapping and provide alternative schemas, which apply according to Avro settings.

The mapping from primitive and string type to Avro schema is:


The mapping from array-type to Avro schema is:


Additional mappings to Avro schema are:


EPL provides the @AvroSchemaField annotation to assign a schema to a given property. The annotation requires the name attribute for the property name and the schema attributed for the Avro schema text.

The schema provided via @AvroSchemaField for a given property replaces the Avro schema that is otherwise assigned according to the above mapping.

The annotation can be used with create-schema or with @EventRepresentation(avro).

In this example the carId property is a union of int-type and string-type.

@AvroSchemaField(name='carId',schema='["int","string"]') create avro schema MyEvent(carId object)

The compiler determines the property type from the Avro field schema according to the rules listed above.

In the default configuration only the primitive data types and the abovementioned classes have a corresponding Avro schema. When the compiler encounters a class for which is does not know the Avro schema that is should use, it fails the statement compile.

For example, the below EPL is invalid:

// Invalid since LocalDateTime has no equivalent Avro schema (by default)
create avro schema MyEvent(ldt as java.time.LocalDateTime)

Instead of using @AvroSchemaField your application can configure a type-representation mapper class that can return the Avro schema to use. For configuration information please see Section 17.4.8.2, “Avro Settings” and the JavaDoc.

Your application can implement the com.espertech.esper.common.client.hook.TypeRepresentationMapper interface. The compiler and runtime invoke the provided mapper to determine the Avro schema for a given field.

For example, the following type mapper implementation maps LocalDateTime to the Avro string type.

public class MyTypeRepresentationMapper implements TypeRepresentationMapper {
  public Object map(TypeRepresentationMapperContext context) {
    if (context.getClazz() == LocalDateTime.class) {
      return builder().stringBuilder().endString();
    }
    return null;
  }
}

The compiler can automatically widen and assign values to Avro fields. In the case when your application requires a custom logic to convert, widen, coerce or transform a value before assigment to an Avro field, please use the mechanism below.

Your application can implement the com.espertech.esper.common.client.hook.ObjectValueTypeWidenerFactory interface. The compiler invokes the provided factory to determine a widener for values.

For example, the factory implementation below returns a type widener that converts LocalDateTime instances to Avro string-type values by using the date-time formatter:

public static class MyObjectValueTypeWidenerFactory implements ObjectValueTypeWidenerFactory {

  public TypeWidener make(ObjectValueTypeWidenerFactoryContext context) {
    if (context.getClazz() == LocalDateTime.class) {
      return new TypeWidener() {
        public Object widen(Object input) {
          LocalDateTime ldt = (LocalDateTime) input;
          return DateTimeFormatter.ISO_DATE_TIME.format(ldt);
        }
      };
    }
    return null;
  }
}

To obtain the Avro schema for a given event type, use:

public static Schema getAvroSchema(EventType eventType) {
  return (Schema) ((AvroSchemaEventType) eventType).getSchema();
}

To obtain the Avro schema for a registered event type, you may use:

public static Schema getAvroSchema(EPRuntime runtime, String eventTypeName) {
  return getAvroSchema(runtime.getEventTypeService().getEventType(eventTypeName));
}

To obtain the Avro schema for a given event, you may use:

public static Schema getAvroSchema(EventBean event) {
  return getAvroSchema(event.getEventType());
}

To obtain the GenericData.Record for a given event, you may use:

public static Schema getAvroRecord(EventBean event) {
  return (GenericData.Record) event.getUnderlying();
}

The following limitations apply: