esper.codehaus.org and espertech.comDocumentation
The file input and output adapter consists of:
File (including CSV) input and output utilizing data flow operators.
The CSV input adapter API.
In order to use the File source and sink data flow operators, add esperio-csv-
version.jar
to your classpath and import the operator package or class using the static or runtime configuration.
The following code snippet uses the runtime configuration API to import the File adapter classes:
epService.getEPAdministrator().getConfiguration() .addImport(FileSourceFactory.class.getPackage().getName() + ".*");
The File input and output adapter provides the following data flow operators:
Table 2.1. File Operators
Operator | Description |
---|---|
FileSink |
Write events to a file. See Section 2.1.2, “FileSink Operator”. |
FileSource |
Read events from a file. See Section 2.1.3, “FileSource Operator”. |
The FileSink operator receives input stream events, transforms events to comma-separated format and writes to a file.
The FileSink operator must have a single input stream.
The FileSink operator cannot declare any output streams.
Parameters for the FileSink operator are (required parameters are listed first):
Table 2.2. FileSink Parameters
Name | Description |
---|---|
file (required) | File name string. An absolute or relative file name. |
classpathFile | Boolean indicator whether the file name is found in a classpath directory, false by default. |
append | Boolean indicator whether to append or overwrite the file when it exists. false by default causes the existing file, if any, to be overwritten. |
The following example declares a data flow that is triggered by MyMapEventType
events from the event bus (type not declared here) and that writes to the file output.csv
the CSV-formatted events:
create dataflow FileSinkSampleFlow EventBusSource -> outstream<MyMapEventType> {} FileSink(outstream) { file: 'output.csv', append: true }
The FileSource operator reads from files, transforms file data and populates a data flow instance with events.
The FileSource operator cannot declare any input streams.
The FileSource operator must have at least one output stream. You can declare additional output streams to hold beginning-of-file and end-of-file indication.
Parameters for the FileSource operator are listed below, with the required parameters listed first:
Table 2.3. FileSource Parameters
Name | Description |
---|---|
file (required, or provide adapterInputSource ) | File name string |
adapterInputSource (required, or provide file ) | An instance of AdapterInputSource if a file name cannot be provided. |
classpathFile | Boolean indicator whether the file is found in a classpath directory, false by default. |
dateFormat | The format to use when parsing dates; the default is SimpleDateFormat of yyyy-MM-dd'T'HH:mm:ss.SSS for Date and Calendar type properties. |
format | Specify csv (the default) for comma-separate value or line for single-line. |
hasTitleLine | For use with the csv format, boolean indicator whether a title line exists that the operator should read and parse to obtain event property names. |
hasHeaderLine | For use with the csv format, boolean indicator whether a header line exists that the operator should skip. |
numLoops | For use with the csv format, number of loops, an integer value that instructs the engine to restart reading the file upon encountering EOF, defaults to zero. |
propertyNames | For use with the csv format, string array with a list of property names in the same order they appear in the file. |
propertyNameLine | For use with the line format, specifies the property name of the output event type that receives the line text of type string. |
propertyNameFile | For use with the line format, specifies the property name of the output event type(s) that receive the file name of type string. |
The first output stream holds per-line output events. For use with the line
format and if declaring two output streams, the second stream holds end-of-file indication. If declaring three output streams, the second stream holds beginning-of-file indication and the third stream holds end-of-file indication.
The line
format requires that the output stream's event type is an object-array event type that features a single string-type property that the operator populates with each line of the file.
The file name (or adapterInputSource
) may point to a zip file. If the file name ends with the literal zip
the operator opens the zip file and uses the first packaged file.
All other parameters including the format parameter for CSV or line-formatting then apply to the zipped file.
This example defines a data flow that consists of two operators that work together to read a file and send the resulting events into the engine:
create dataflow SensorCSVFlow FileSource -> sensorstream<TemperatureEventStream> { file: 'sensor_events.csv', propertyNames: ['sensor','temp','updtime'], numLoops: 3 } EventBusSink(sensorstream){}
The data flow above configures the FileSource
operator to read the file sensor_events.csv
, populate the sensor
, temp
and updtime
properties of the TemperatureEventStream
event type (type definition not shown here) and make the output events available within the data flow under the name sensorstream
.
The data flow above configures the EventBusSource
operator to send the sensorstream
events into the engine for processing.
This example shows the EPL and code to read and count lines in text files.
Below EPL defines an event type to each hold the file line text as well as to indictate the beginning and end of a file (remove the semicolon if creating EPL individually and not as a module):
// for beginning-of-file events create objectarray schema MyBOF (filename string); // for end of file events create objectarray schema MyEOF (filename string); // for line text events create objectarray schema MyLine (filename string, line string);
The next EPL statements count lines per file outputting the final line count only when the end-of-file is reached.
// Initiate a context partition for each file, terminate upon end-of-file create context FileContext initiated by MyBOF as mybof terminated by MyEOF(filename=mybof.filename); // For each file, count lines context FileContext select context.mybof.filename as filename, count(*) as cnt from MyLine(filename=context.mybof.filename) output snapshot when terminated;
The below EPL defines a data flow that reads text files line-by-line and that send events into the engine for processing.
create dataflow MyEOFEventFileReader FileSource -> mylines<MyLine>, mybof<MyBOF>, myeof<MyEOF> { format: 'line', propertyNameLine: 'line', // store the text in the event property 'line' propertyNameFile: 'filename' // store the file name in 'filename' } EventBusSink(mylines, mybof, myeof) {} // send events into engine
The next sample code instantiates and runs data flows passing a file name:
EPDataFlowInstantiationOptions options = new EPDataFlowInstantiationOptions(); options.addParameterURI("FileSource/file", "myfile.txt"); EPDataFlowInstance instance = epService.getEPRuntime().getDataFlowRuntime() .instantiate("MyEOFEventFileReader",options); instance.run();
This chapter discusses the CSV input adapter API. CSV is an abbreviation for comma-separated values. CSV files are simple text files in which each line
is a comma-separated list of values. CSV-formatted text can be read from many different input sources via com.espertech.esperio.AdapterInputSource
.
Please consult the JavaDoc for additional information on AdapterInputSource
and the CSV adapter.
In summary the CSV input adapter API performs the following functions:
Read events from an input source providing CSV-formatted text and send the events to an Esper engine instance
Read from different types of input sources
Use a timestamp column to schedule events being sent into the engine
Playback with options such as file looping, events per second and other options
Use the Esper engine timer thread to read the CSV file
Read multiple CSV files using a timestamp column to simulate events coming from different streams
The following formatting rules and restrictions apply to CSV-formatted text:
Comment lines are prefixed with a single hash or pound #
character
Strings are placed in double quotes, e.g. "value"
Escape rules follow common spreadsheet conventions, i.e. double quotes can be escaped via double quote
A column header is required unless a property order is defined explicitly
If a column header is used, properties are assumed to be of type String unless otherwise configured
The value of the timestamp column, if one is given, must be in ascending order
The adapter reads events from a CSV input source and sends events to an engine using the class com.espertech.esperio.csv.CSVInputAdapter
.
The below code snippet reads the CSV-formatted text file "simulation.csv" expecting the file in the classpath. The AdapterInputSource
class can take other input sources.
AdapterInputSource source = new AdapterInputSource("simulation.csv"); (new CSVInputAdapter(epServiceProvider, source, "PriceEvent")).start();
To use the CSVInputAdapter without any options, the event type PriceEvent
and its property names and value types must be known to the engine. The next section elaborates on adapter options.
Configure the engine instance for a Map-based event type
Place a header record in your CSV file that names each column as specified in the event type
The sample application code below shows all the steps to configure, via API, a Map-based event type and play the CSV file without setting any of the available options.
Map<String, Class> eventProperties = new HashMap<String, Class>(); eventProperties.put("symbol", String.class); eventProperties.put("price", double.class); eventProperties.put("volume", Integer.class); Configuration configuration = new Configuration(); configuration.addEventType("PriceEvent", eventProperties); epService = EPServiceProviderManager.getDefaultProvider(configuration); EPStatement stmt = epService.getEPAdministrator().createEPL( "select symbol, price, volume from PriceEvent.win:length(100)"); (new CSVInputAdapter(epService, new AdapterInputSource(filename), "PriceEvent")).start();
The contents of a sample CSV file is shown next.
symbol,price,volume IBM,55.5,1000
The next code snippet outlines using a java.io.Reader
as an alternative input source :
String myCSV = "symbol, price, volume" + NEW_LINE + "IBM, 10.2, 10000"; StringReader reader = new StringReader(myCSV); (new CSVInputAdapter(epService, new AdapterInputSource(reader), "PriceEvent")).start();
In the previous code samples, the PriceEvent
properties were defined programmatically with their correct types. It is possible to
skip this step and use only a column header record. In such a case you must define property types in the header otherwise a type of String is assumed.
Consider the following:
symbol,double price, int volume IBM,55.5,1000 symbol,price,volume IBM,55.5,1000
The first CSV file defines explicit types in the column header while the second file does not. With the second file a statement like
select sum(volume) from PriceEvent.win:time(1 min)
will be rejected as in the second file volume
is defaulted
to type String - unless otherwise programmatically configured.
The previous section used an event type based on java.util.Map
. The adapter can also populate the CSV data into JavaBean events directly, as long as your event class provides setter-methods that follow JavaBean conventions. Note that esperio will ignore read-only properties i.e. if you have a read-only property priceByVolume it will not expect a corresponding column in the input file.
To use Java objects as events instead of Map-based event types, simply register the event type name for the Java class and provide the same name to the CSV adapter.
The below code snipped assumes that a PriceEvent class exists that exposes setter-methods for the three properties. The setter-methods are, for example, setSymbol(String s)
, setPrice(double p)
and setVolume(long v)
.
Configuration configuration = new Configuration(); configuration.addEventType("PriceEvent", PriceEvent.class); epService = EPServiceProviderManager.getDefaultProvider(configuration); EPStatement stmt = epService.getEPAdministrator().createEPL( "select symbol, price, volume from PriceEvent.win:length(100)"); (new CSVInputAdapter(epService, new AdapterInputSource(filename), "PriceEvent")).start();
When using JavaBean POJO Events, the event properties types are known from the underlying event type configuration. The CSV file row header does not need to define column type explicitly.
Wether you use JavaBean POJO or Map-based event types, EsperIO provides support for nested event properties up to one level of nesting.
The row header must then refer to the properties using a propertyName.nestedPropertyName
syntax.
There is no support for mapped or indexed properties.
For example consider the following:
public class Point { int x; int y; // with getters & setters } public class Figure { String name; Point point; // point.x and point.y are nested properties //with getters & setters }
Or the equivalent representation with nested maps, assuming "Figure" is the declared event type name, the CSV file can contain the following row header:
name, point.x, point.y
Use the CSVInputAdapterSpec
class to set playback options. The following options are available:
Loop - Reads the CSV input source in a loop; When the end is reached, the input adapter rewinds to the beginning
Events per second - Controls the number of events per second that the adapter sends to the engine
Property order - Controls the order of event property values in the CSV input source, for use when the CSV input source does not have a header column
Property types - Defines a new Map-based event type given a map of event property names and types. No engine configuration for the event type is required as long as the input adapter is created before statements against the event type are created.
Engine thread - Instructs the adapter to use the engine timer thread to read the CSV input source and send events to the engine
External timer - Instructs the adapter to use the esper's external timer rather than the internal timer. See "Sending timer events" below
Timestamp column name - Defines the name of the timestamp column in the CSV input source; The timestamp column must carry long-typed timestamp values relative to the current time; Use zero for the current time
The next code snippet shows the use of CSVInputAdapterSpec
to set playback options.
CSVInputAdapterSpec spec = new CSVInputAdapterSpec(new AdapterInputSource(myURL), "PriceEvent"); spec.setEventsPerSec(1000); spec.setLooping(true); InputAdapter inputAdapter = new CSVInputAdapter(epService, spec); inputAdapter.start(); // method blocks unless engine thread option is set
The adapter can be instructed to use either esper's internal timer, or to drive timing itself by sending external timer events. If the internal timer is used, esperio will send all events in "real time". For example, if an input file contains the following data:
symbol,price,volume,timestamp IBM,55.5,1000,2 GOOG,9.5,1000,3 MSFT,8.5,1000,3 JAVA,7.5,1000,1004
then esperio will sleep for 1001 milliseconds between sending the MSFT and JAVA events to the engine.
If external timing is enabled then esperio will run through the input file at full speed without pausing. The algorithm used sends a time event after all events for a particular time have been received. For the above example file a time event for 2 will be sent after IBM, for 3 after MSFT and 1004 after JAVA. For many of use cases this gives a performance improvement.
The CSV input adapter can run simulations of events arriving in time-order from different input streams. Use the AdapterCoordinator
as a specialized input adapter for coordinating multiple CSV input sources by timestamp.
The sample application code listed below simulates price and trade events arriving in timestamp order. Via the adapter the application reads two CSV-formatted files from a URL that each contain a timestamp column as well as price or trade events. The AdapterCoordinator
uses the timestamp column to send events to the engine in the exact ordering prescribed by the timestamp values.
AdapterInputSource sourceOne = new AdapterInputSource(new URL("FILE://prices.csv")); CSVInputAdapterSpec inputOne = new CSVInputAdapterSpec(sourceOne, "PriceEvent"); inputOne.setTimestampColumn("timestamp"); AdapterInputSource sourceTwo = new AdapterInputSource(new URL("FILE://trades.csv")); CSVInputAdapterSpec inputTwo = new CSVInputAdapterSpec(sourceTwo, "TradeEvent"); inputTwo.setTimestampColumn("timestamp"); AdapterCoordinator coordinator = new AdapterCoordinatorImpl(epService, true); coordinator.coordinate(new CSVInputAdapter(inputOne)); coordinator.coordinate(new CSVInputAdapter(inputTwo)); coordinator.start();
The AdapterCoordinatorImpl
is provided with two parameters: the engine instance, and a boolean value
that instructs the adapter to use the engine timer thread if set to true, and the adapter can use the application thread if the flag passed is false.
You may not set an event rate per second when using a timestamp column and time-order.
The CSV adapter can employ the engine timer thread of an Esper engine instance to read and send events. This can be controlled via the setUsingEngineThread
method on CSVInputAdapterSpec
. We use that feature in the sample code below to pause and resume a running CSV input adapter.
CSVInputAdapterSpec spec = new CSVInputAdapterSpec(new AdapterInputSource(myURL), "PriceEvent"); spec.setEventsPerSec(100); spec.setUsingEngineThread(true); InputAdapter inputAdapter = new CSVInputAdapter(epService, spec); inputAdapter.start(); // method starts adapter and returns, non-blocking Thread.sleep(5000); // sleep 5 seconds inputAdapter.pause(); Thread.sleep(5000); // sleep 5 seconds inputAdapter.resume(); Thread.sleep(5000); // sleep 5 seconds inputAdapter.stop();