www.espertech.comDocumentation
This chapter discusses the EsperIO adapter for relational databases.
This adapter requires Esper compiler as well as Esper runtime as a dependency.
The EsperIO relational database adapter can write events to a database table.
If your application only reads from tables, the Esper jar file and Esper configuration suffices and there is no additional EsperIO DB adapter configuration or jar file required. Please see below tips for reading or polling tables.
The EsperIO DB adapter supports two means to write to a database table:
Execute a SQL DML (Data Manipulation, i.e. Update, Insert, Delete or stored procedure call) statement as a response to a triggering event.
Execute an Update-Insert: The adapter attempts an Update of a row by key and if unsuccessful (update returns zero rows updated) the adapter performs an Insert.
The adapter also provides infrastructure for queuing database write requests for execution by a thread pool.
You may configure the EsperIO DB adapter either as part of your Esper configuration file in the plugin loader section or via the adapter API. Add the esperio-db-version.jar
file to your classpath as well as the JDBC driver. There are not other dependent jar files required by the adapter.
A sample adapter configuration file is provided in esperio-db-sample-config.xml
in the etc
folder of the distribution. A configuration file must be valid according to schema esperio-db-configuration-8-0.xsd
.
You may place the configuration XML directly into your Esper configuration file as the example below shows:
<esper-configuration xmlns="http://www.espertech.com/schema/esper"> <runtime> <plugin-loader name="EsperIODBAdapter" class-name="com.espertech.esperio.db.EsperIODBAdapterPlugin"> <config-xml> <esperio-db-configuration> <!-- .....as outlined below or contents of esperio-db-sample-config.xml here... --> </esperio-db-configuration> </config-xml> </plugin-loader> </runtime> </esper-configuration>
Alternatively you can provide a URL in the Esper configuration file to point to your adapter configuration file:
<esper-configuration xmlns="http://www.espertech.com/schema/esper"> <runtime> <plugin-loader name="EsperIODBAdapter" class-name="com.espertech.esperio.db.EsperIODBAdapterPlugin"> <init-arg name="esperio.db.configuration.file" value="file:/path/esperio-db-sample-config.xml" /> </plugin-loader> </runtime> </esper-configuration>
If using Spring or if your application requires API access, the following code snippet configures and starts the adapter via API.
The class for configuring an EsperIO DB adapter is com.espertech.esperio.db.config.ConfigurationDBAdapter
. The adapter class itself is
EsperIODBAdapter
.
The code snippet below is a sample that configures using driver manager and starts the adapter via API:
ConfigurationDBAdapter adapterConfig = new ConfigurationDBAdapter(); ConfigurationCommonDBRef configDB = new ConfigurationCommonDBRef(); configDB.setDriverManagerConnection("DRIVER", "URL", new Properties()); adapterConfig.getJdbcConnections().put("db1", configDB); // add additional configuration such as DML and Upsert // start adapter EsperIODBAdapter dbAdapter = new EsperIODBAdapter(adapterConfig, "runtimeURI"); dbAdapter.start();
The configuration for the source of JDBC connections follows the Esper configuration. Please consult the Esper documentation or sample adapter configuration file for details.
Your configuration should set auto-commit
to true thereby each change to database tables is automatically committed.
The adapter obtains a new connection for each operation and closes the connection after each operation. For optimum performance consider configuring a connection pool.
A sample JDBC connection configuration is shown in below XML. The API class is ConfigurationCommonDBRef
. You may also configure a DataSource
or DataSource
factory as outlined in the Esper docs.
<esperio-db-configuration> <jdbc-connection name="db1"> <drivermanager-connection class-name="com.mysql.jdbc.Driver" url="jdbc:mysql://localhost/test" user="root" password="password"> <connection-settings auto-commit="true" catalog="TEST"/> </jdbc-connection> <!-- Add DML and Upsert configurations here, as below. --> </esperio-db-configuration>
This facility allows running a SQL DML (Data Manipulation) query, i.e. an Update, Insert, Delete query or a stored procedure when an event in a triggering stream occurs.
Define a dml
element in the adapter configuration file (or use the DMLQuery
class) for every query to execute.
The synopsis is as follows:
<dml connection="[connection]" stream="[stream]" [name="name"] [executor-name="executor"] [retry="count"] [retry-interval-sec="sec"]> <sql>[sql]</sql> <bindings> <parameter pos="[position]" property="[property_name]"/> [...parameters] </bindings> </dml>
The connection attribute value is required and provides the name of the configured JDBC connection.
A value for the stream attribute is required and provides the name of the stream that triggers the DML. The adapter expects a stream by this name to exist at adapter start time.
The name attribute is optional and is only used for logging errors.
The executor-name attribute is optional. If specified, it must be the name of an executor configuration. If specified, the adapter will use the executor service (queue and thread pool) for performing all DML work. If not specified, the adapter performs the DML work in the same thread.
The retry attribute is optional. If specified, the adapter will retry a given number of times in case an error is encountered. If retry-interval-sec is specified, the adapter waits the given number of seconds between retries.
The sql element is required and provides the SQL DML or stored procedure call to execute, with parameters as question mark (?).
The bindings element is required and provides the bindings for expression parameters.
The parameter element should occur as often as there are parameters in the SQL query. The position attribute starts at 1 and counts up for each parameter. The property parameter provide the name of the event property of the stream to use as the parameter value.
A sample DML configuration follows:
<dml connection="db1" stream="InsertToDBStream" name="MyInsertQuery" executor-name="queue1" retry="count"> <sql>insert into MyEventStore(key1, value1, value2) values (?, ?, ?)</sql> <bindings> <parameter pos="1" property="eventProperty1"/> <parameter pos="2" property="eventProperty2"/> <parameter pos="3" property="eventProperty3"/> </bindings> </dml>
This facility allows running an SQL Update that is followed by an Insert if the Update did not update any rows.
Define an upsert
element in the adapter configuration file (or use the UpsertQuery
class) for every update-insert to execute.
The synopsis is as follows:
<upsert connection="[connection]" stream="[stream]" table-name="[table]" [name="name"] [executor-name="executor"] [retry="count"] [retry-interval-sec="sec"]> <keys> <column property="[property_name]" column="[column_name]" type="[sql_type]"/> [...column] </keys> <values> <column property="[property_name]" column="[column_name]" type="[sql_type]"/> [...column] </values> </upsert>
The connection attribute value is required and provides the name of the configured JDBC connection.
A value for the stream attribute is required and provides the name of the stream that triggers the Update-Insert. The adapter expects a stream by this name to exist at adapter start time.
The table attribute value is required and provides the database table name.
The name attribute is optional and is only used for logging errors.
The executor-name attribute is optional. If specified, it must be the name of an executor configuration. If specified, the adapter will use the executor service (queue and thread pool) for performing all work. If not specified, the adapter performs the work in the same thread.
The retry attribute is optional. If specified, the adapter will retry a given number of times in case an error is encountered. If retry-interval-sec is specified, the adapter waits the given number of seconds between retries.
The keys element is required and provides the key columns of the table and the values element provides the list of value columns of the table.
The column element should occur as many as there are key and value columns in the table. The property attribute provides the name of the event property, the column attribute provides the database table column name and the type is any of the java.sql.Types
names (case ignored).
A sample Update-Insert configuration follows:
<upsert connection="db1" stream="UpdateInsertDBTableTrigger" name="UpdateInsertSample" table-name="MyKeyedTable" executor-name="queue1" retry="3"> <keys> <column property="eventProperty1" column="keyColumn1" type="varchar"/> <column property="eventProperty2" column="keyColumn2" type="varchar"/> </keys> <values> <column property="eventProperty3" column="valueColumn1" type="varchar"/> <column property="eventProperty4" column="valueColumn2" type="integer"/> </values> </upsert>
Executors are named thread pools and queues that may be assigned to perform DML or update-insert work.
Define a executor
element in the adapter configuration file (or use the Executor
class) for every thread pool and queue to declare.
Upon adapter start, for each executor the adapter starts the given number of threads and an unbound queue.
The synopsis is as follows:
<executors> <executor name="[name]" threads="[count]"/> </executors>
The name attribute value is required and provides the name of the executor, while the count attribute specifies the number of threads for the thread pool.
An example executor configuration::
<executors> <executor name="threadPool1" threads="2"/> </executors>
An application can obtain a handle to all thread pools and queues via the Esper runtime context:
ExecutorServices execs = (ExecutorServices) runtime.getContext().lookup("EsperIODBAdapter/ExecutorServices");
Herein we provide sample statements and documentation pointers to use Esper EPL for reading from database tables. If only reading and not writing to a database, no configuration or EsperIO jar is file required.
Please consult the Esper SQL access documentation for more information.
To execute an SQL query repeatedly, Esper provides the opportunity to join a pattern to an SQL statement. The pattern may provide a single interval or crontab schedule or may also contain multiple schedules or combinations thereof via the pattern or
operator.
The sample query below simply executes every 10 seconds retrieving all rows from table MyTable
:
select * from pattern[every timer:interval(10)], sql:db1 ['select * from MyTable']
To perform an incremental query, consider utilizing a variable to parameterize your SQL statement so that only new rows are returned.
The next EPL statements create a variable and pass the variable value to the query to poll for new rows only. It assumes the timestamp
column in the MyTable
table holds long-type millisecond values:
// Create a variable to hold the last poll timestamp create variable long VarLastTimestamp = 0 // Poll every 15 seconds between 8am and 4pm based on variable value insert into PollStream select * from pattern[every timer:crontab(*, 8-16, *, *, *, */15)], sql:db1 ['select * from MyTable where timestamp > ${VarLastTimestamp}'] // Assign last value to variable on PollStream set VarLastTimestamp = timestamp
A sample statement to read a table at startup time is below:
select * from pattern[timer:interval(0)], sql:db1 ['select * from MyTable']