www.espertech.comDocumentation

Chapter 2. The File and CSV Input and Output Adapter

2.1. Data Flow Operators
2.1.1. Introduction
2.1.2. FileSink Operator
2.1.3. FileSource Operator
2.2. CSV Input Adapter API
2.2.1. Introduction
2.2.2. Playback of CSV-formatted Events
2.2.3. CSV Playback Options
2.2.4. Simulating Multiple Event Streams
2.2.5. Pausing and Resuming Operation

The file input and output adapter consists of:

  1. File (including CSV) input and output utilizing data flow operators.

  2. The CSV input adapter API.

The FileSource operator reads from files, transforms file data and populates a data flow instance with events.

The FileSource operator cannot declare any input streams.

The FileSource operator must have at least one output stream. You can declare additional output streams to hold beginning-of-file and end-of-file indication.

Parameters for the FileSource operator are listed below, with the required parameters listed first:


The first output stream holds per-line output events. For use with the line format and if declaring two output streams, the second stream holds end-of-file indication. If declaring three output streams, the second stream holds beginning-of-file indication and the third stream holds end-of-file indication.

The line format requires that the output stream's event type is an object-array event type that features a single string-type property that the operator populates with each line of the file.

The file name (or adapterInputSource) may point to a zip file. If the file name ends with the literal zip the operator opens the zip file and uses the first packaged file. All other parameters including the format parameter for CSV or line-formatting then apply to the zipped file.

This example defines a data flow that consists of two operators that work together to read a file and send the resulting events into the engine:

create dataflow SensorCSVFlow
  FileSource -> sensorstream<TemperatureEventStream> {
    file: 'sensor_events.csv', 
    propertyNames: ['sensor','temp','updtime'], 
    numLoops: 3
  }
  EventBusSink(sensorstream){}

The data flow above configures the FileSource operator to read the file sensor_events.csv, populate the sensor, temp and updtime properties of the TemperatureEventStream event type (type definition not shown here) and make the output events available within the data flow under the name sensorstream.

The data flow above configures the EventBusSource operator to send the sensorstream events into the engine for processing.

This example shows the EPL and code to read and count lines in text files.

Below EPL defines an event type to each hold the file line text as well as to indictate the beginning and end of a file (remove the semicolon if creating EPL individually and not as a module):

// for beginning-of-file events
create objectarray schema MyBOF (filename string); 
// for end of file events
create objectarray schema MyEOF (filename string); 
// for line text events
create objectarray schema MyLine (filename string, line string);  

The next EPL statements count lines per file outputting the final line count only when the end-of-file is reached.

// Initiate a context partition for each file, terminate upon end-of-file
create context FileContext 
  initiated by MyBOF as mybof 
  terminated by MyEOF(filename=mybof.filename);
  
// For each file, count lines 
context FileContext 
  select context.mybof.filename as filename, count(*) as cnt
  from MyLine(filename=context.mybof.filename)
  output snapshot when terminated;

The below EPL defines a data flow that reads text files line-by-line and that send events into the engine for processing.

create dataflow MyEOFEventFileReader
  FileSource -> mylines<MyLine>, mybof<MyBOF>, myeof<MyEOF> { 
    format: 'line', 
    propertyNameLine: 'line',      // store the text in the event property 'line' 
    propertyNameFile: 'filename'   // store the file name in 'filename'
  }
  EventBusSink(mylines, mybof, myeof) {}  // send events into engine

The next sample code instantiates and runs data flows passing a file name:

EPDataFlowInstantiationOptions options = new EPDataFlowInstantiationOptions();
options.addParameterURI("FileSource/file", "myfile.txt");
EPDataFlowInstance instance = epService.getEPRuntime().getDataFlowRuntime()
    .instantiate("MyEOFEventFileReader",options);
instance.run();

This chapter discusses the CSV input adapter API. CSV is an abbreviation for comma-separated values. CSV files are simple text files in which each line is a comma-separated list of values. CSV-formatted text can be read from many different input sources via com.espertech.esperio.csv.AdapterInputSource. Please consult the JavaDoc for additional information on AdapterInputSource and the CSV adapter.

The adapter reads events from a CSV input source and sends events to an engine using the class com.espertech.esperio.csv.CSVInputAdapter.

The below code snippet reads the CSV-formatted text file "simulation.csv" expecting the file in the classpath. The AdapterInputSource class can take other input sources.

AdapterInputSource source = new AdapterInputSource("simulation.csv");
(new CSVInputAdapter(epServiceProvider, source, "PriceEvent")).start();

To use the CSVInputAdapter without any options, the event type PriceEvent and its property names and value types must be known to the engine. The next section elaborates on adapter options.

The sample application code below shows all the steps to configure, via API, a Map-based event type and play the CSV file without setting any of the available options.

Map<String, Class> eventProperties = new HashMap<String, Class>();
eventProperties.put("symbol", String.class);
eventProperties.put("price", double.class);
eventProperties.put("volume", Integer.class);

Configuration configuration = new Configuration();
configuration.addEventType("PriceEvent", eventProperties);

epService = EPServiceProviderManager.getDefaultProvider(configuration);

EPStatement stmt = epService.getEPAdministrator().createEPL(
   "select symbol, price, volume from PriceEvent.win:length(100)");

(new CSVInputAdapter(epService, new AdapterInputSource(filename), "PriceEvent")).start();

The contents of a sample CSV file is shown next.

symbol,price,volume
IBM,55.5,1000

The next code snippet outlines using a java.io.Reader as an alternative input source :

String myCSV = "symbol, price, volume" + NEW_LINE + "IBM, 10.2, 10000";
StringReader reader = new StringReader(myCSV);
(new CSVInputAdapter(epService, new AdapterInputSource(reader), "PriceEvent")).start();

In the previous code samples, the PriceEvent properties were defined programmatically with their correct types. It is possible to skip this step and use only a column header record. In such a case you must define property types in the header otherwise a type of String is assumed.

Consider the following:

symbol,double price, int volume
IBM,55.5,1000

symbol,price,volume
IBM,55.5,1000

The first CSV file defines explicit types in the column header while the second file does not. With the second file a statement like select sum(volume) from PriceEvent.win:time(1 min) will be rejected as in the second file volume is defaulted to type String - unless otherwise programmatically configured.

Use the CSVInputAdapterSpec class to set playback options. The following options are available:

The next code snippet shows the use of CSVInputAdapterSpec to set playback options.

CSVInputAdapterSpec spec = new CSVInputAdapterSpec(new AdapterInputSource(myURL), "PriceEvent");
spec.setEventsPerSec(1000);
spec.setLooping(true);
  
InputAdapter inputAdapter = new CSVInputAdapter(epService, spec);
inputAdapter.start();	// method blocks unless engine thread option is set

The CSV input adapter can run simulations of events arriving in time-order from different input streams. Use the AdapterCoordinator as a specialized input adapter for coordinating multiple CSV input sources by timestamp.

The sample application code listed below simulates price and trade events arriving in timestamp order. Via the adapter the application reads two CSV-formatted files from a URL that each contain a timestamp column as well as price or trade events. The AdapterCoordinator uses the timestamp column to send events to the engine in the exact ordering prescribed by the timestamp values.

AdapterInputSource sourceOne = new AdapterInputSource(new URL("FILE://prices.csv"));
CSVInputAdapterSpec inputOne = new CSVInputAdapterSpec(sourceOne, "PriceEvent");
inputOne.setTimestampColumn("timestamp");

AdapterInputSource sourceTwo = new AdapterInputSource(new URL("FILE://trades.csv"));
CSVInputAdapterSpec inputTwo = new CSVInputAdapterSpec(sourceTwo, "TradeEvent");
inputTwo.setTimestampColumn("timestamp");

AdapterCoordinator coordinator = new AdapterCoordinatorImpl(epService, true);
coordinator.coordinate(new CSVInputAdapter(inputOne));
coordinator.coordinate(new CSVInputAdapter(inputTwo));
coordinator.start();

The AdapterCoordinatorImpl is provided with two parameters: the engine instance, and a boolean value that instructs the adapter to use the engine timer thread if set to true, and the adapter can use the application thread if the flag passed is false.

You may not set an event rate per second when using a timestamp column and time-order.