com.espertech.esper.epl.variable
Class VariableServiceImpl

java.lang.Object
  extended by com.espertech.esper.epl.variable.VariableServiceImpl
All Implemented Interfaces:
VariableService

public class VariableServiceImpl
extends java.lang.Object
implements VariableService

Variables service for reading and writing variables, and for setting a version number for the current thread to consider variables for.

Consider a statement as follows: select * from MyEvent as A where A.val > var1 and A.val2 > var1 and A.val3 > var2

Upon statement execution we need to guarantee that the same atomic value for all variables is applied for all variable reads (by expressions typically) within the statement.

Designed to support:

  1. lock-less read of the current and prior version, locked reads for older versions
  2. atomicity by keeping multiple versions for each variable and a threadlocal that receives the current version each call
  3. one write lock for all variables (required to coordinate with single global version number), however writes are very fast (entry to collection plus increment an int) and therefore blocking should not be an issue

As an alternative to a version-based design, a read-lock for the variable space could also be used, with the following disadvantages: The write lock may just not be granted unless fair locks are used which are more expensive; And a read-lock is more expensive to acquire for multiple CPUs; A thread-local is still need to deal with "set var1=3, var2=var1+1" assignments where the new uncommitted value must be visible in the local evaluation.

Every new write to a variable creates a new version. Thus when reading variables, readers can ignore newer versions and a read lock is not required in most circumstances.

This algorithm works as follows:

A thread processing an event into the engine via sendEvent() calls the "setLocalVersion" method once before processing a statement that has variables. This places into a threadlocal variable the current version number, say version 570.

A statement that reads a variable has an ExprVariableNode that has a VariableReader handle obtained during validation (example).

The VariableReader takes the version from the threadlocal (570) and compares the version number with the version numbers held for the variable. If the current version is same or lower (520, as old or older) then the threadlocal version, then use the current value. If the current version is higher (571, newer) then the threadlocal version, then go to the prior value. Use the prior value until a version is found that as old or older then the threadlocal version.

If no version can be found that is old enough, output a warning and return the newest version. This should not happen, unless a thread is executing for very long within a single statement such that lifetime-old-version time speriod passed before the thread asks for variable values.

As version numbers are counted up they may reach a boundary. Any write transaction after the boundary is reached performs a roll-over. In a roll-over, all variables version lists are newly created and any existing threads that read versions go against a (old) high-collection, while new threads reading the reset version go against a new low-collection.

The class also allows an optional state handler to be plugged in to handle persistence for variable state. The state handler gets invoked when a variable changes value, and when a variable gets created to obtain the current value from persistence, if any.


Field Summary
protected static int HIGH_WATERMARK_VERSIONS
          Applicable for each variable if more then the number of versions accumulated, check timestamps to determine if a version can be expired.
protected static int ROLLOVER_READER_BOUNDARY
          Sets the boundary above which a reader considers the high-version list of variable values.
 
Fields inherited from interface com.espertech.esper.epl.variable.VariableService
NOCONTEXT_AGENTINSTANCEID
 
Constructor Summary
protected VariableServiceImpl(int startVersion, long millisecondLifetimeOldVersions, TimeProvider timeProvider, EventAdapterService eventAdapterService, VariableStateHandler optionalStateHandler)
          Ctor.
  VariableServiceImpl(long millisecondLifetimeOldVersions, TimeProvider timeProvider, EventAdapterService eventAdapterService, VariableStateHandler optionalStateHandler)
          Ctor.
 
Method Summary
 void allocateVariableState(java.lang.String variableName, int agentInstanceId, StatementExtensionSvcContext extensionServicesContext)
           
 void checkAndWrite(java.lang.String variableName, int agentInstanceId, java.lang.Object newValue)
          Check type of the value supplied and writes the new variable value.
 void commit()
          Commits the variable outstanding changes.
 void createNewVariable(java.lang.String optionalContextName, java.lang.String variableName, java.lang.String variableType, boolean constant, boolean array, boolean arrayOfPrimitive, java.lang.Object value, EngineImportService engineImportService)
          Creates a new variable.
 void deallocateVariableState(java.lang.String variableName, int agentInstanceId)
           
 void destroy()
           
 VariableReader getReader(java.lang.String variableName, int agentInstanceIdAccessor)
          Returns a reader that provides access to variable values.
 java.util.concurrent.ConcurrentHashMap<java.lang.Integer,VariableReader> getReadersPerCP(java.lang.String variableName)
           
 java.util.concurrent.locks.ReadWriteLock getReadWriteLock()
          Lock for use in atomic writes to the variable space.
 VariableMetaData getVariableMetaData(java.lang.String variableName)
           
 java.util.Map<java.lang.String,VariableReader> getVariableReadersNonCP()
          Returns a map of variable name and reader, for thread-safe iteration.
 java.lang.String isContextVariable(java.lang.String variableName)
           
 void registerCallback(java.lang.String variableName, int agentInstanceId, VariableChangeCallback variableChangeCallback)
          Registers a callback invoked when the variable is written with a new value.
 void removeVariableIfFound(java.lang.String name)
          Removes a variable.
 void rollback()
          Rolls back the variable outstanding changes.
 void setLocalVersion()
          Sets the variable version that subsequent reads consider.
 java.lang.String toString()
           
 void unregisterCallback(java.lang.String variableName, int agentInstanceId, VariableChangeCallback variableChangeCallback)
          Removes a callback.
 void write(int variableNumber, int agentInstanceId, java.lang.Object newValue)
          Writes a new variable value.
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 

Field Detail

ROLLOVER_READER_BOUNDARY

protected static final int ROLLOVER_READER_BOUNDARY
Sets the boundary above which a reader considers the high-version list of variable values. For use in roll-over when the current version number overflows the ROLLOVER_WRITER_BOUNDARY.

See Also:
Constant Field Values

HIGH_WATERMARK_VERSIONS

protected static final int HIGH_WATERMARK_VERSIONS
Applicable for each variable if more then the number of versions accumulated, check timestamps to determine if a version can be expired.

See Also:
Constant Field Values
Constructor Detail

VariableServiceImpl

public VariableServiceImpl(long millisecondLifetimeOldVersions,
                           TimeProvider timeProvider,
                           EventAdapterService eventAdapterService,
                           VariableStateHandler optionalStateHandler)
Ctor.

Parameters:
millisecondLifetimeOldVersions - number of milliseconds a version may hang around before expiry
timeProvider - provides the current time
optionalStateHandler - a optional plug-in that may store variable state and retrieve state upon creation
eventAdapterService - event adapters

VariableServiceImpl

protected VariableServiceImpl(int startVersion,
                              long millisecondLifetimeOldVersions,
                              TimeProvider timeProvider,
                              EventAdapterService eventAdapterService,
                              VariableStateHandler optionalStateHandler)
Ctor.

Parameters:
startVersion - the first version number to start from
millisecondLifetimeOldVersions - number of milliseconds a version may hang around before expiry
timeProvider - provides the current time
optionalStateHandler - a optional plug-in that may store variable state and retrieve state upon creation
eventAdapterService - for finding event types
Method Detail

destroy

public void destroy()
Specified by:
destroy in interface VariableService

removeVariableIfFound

public void removeVariableIfFound(java.lang.String name)
Description copied from interface: VariableService
Removes a variable.

Specified by:
removeVariableIfFound in interface VariableService
Parameters:
name - to remove

setLocalVersion

public void setLocalVersion()
Description copied from interface: VariableService
Sets the variable version that subsequent reads consider.

Specified by:
setLocalVersion in interface VariableService

registerCallback

public void registerCallback(java.lang.String variableName,
                             int agentInstanceId,
                             VariableChangeCallback variableChangeCallback)
Description copied from interface: VariableService
Registers a callback invoked when the variable is written with a new value.

Specified by:
registerCallback in interface VariableService
variableChangeCallback - a callback

unregisterCallback

public void unregisterCallback(java.lang.String variableName,
                               int agentInstanceId,
                               VariableChangeCallback variableChangeCallback)
Description copied from interface: VariableService
Removes a callback.

Specified by:
unregisterCallback in interface VariableService
variableChangeCallback - a callback

createNewVariable

public void createNewVariable(java.lang.String optionalContextName,
                              java.lang.String variableName,
                              java.lang.String variableType,
                              boolean constant,
                              boolean array,
                              boolean arrayOfPrimitive,
                              java.lang.Object value,
                              EngineImportService engineImportService)
                       throws VariableExistsException,
                              VariableTypeException
Description copied from interface: VariableService
Creates a new variable.

Specified by:
createNewVariable in interface VariableService
variableName - name of the variable
variableType - variable type
Throws:
VariableExistsException - if the variable name is already in use
VariableTypeException - if the variable type cannot be recognized

allocateVariableState

public void allocateVariableState(java.lang.String variableName,
                                  int agentInstanceId,
                                  StatementExtensionSvcContext extensionServicesContext)
Specified by:
allocateVariableState in interface VariableService

deallocateVariableState

public void deallocateVariableState(java.lang.String variableName,
                                    int agentInstanceId)
Specified by:
deallocateVariableState in interface VariableService

getVariableMetaData

public VariableMetaData getVariableMetaData(java.lang.String variableName)
Specified by:
getVariableMetaData in interface VariableService

getReader

public VariableReader getReader(java.lang.String variableName,
                                int agentInstanceIdAccessor)
Description copied from interface: VariableService
Returns a reader that provides access to variable values. The reader considers the version currently set via setLocalVersion.

Specified by:
getReader in interface VariableService
Parameters:
variableName - the variable that the reader should read
Returns:
reader

isContextVariable

public java.lang.String isContextVariable(java.lang.String variableName)
Specified by:
isContextVariable in interface VariableService

write

public void write(int variableNumber,
                  int agentInstanceId,
                  java.lang.Object newValue)
Description copied from interface: VariableService
Writes a new variable value.

Must be followed by either a commit or rollback.

Specified by:
write in interface VariableService
Parameters:
variableNumber - the index number of the variable to write (from VariableReader)
newValue - the new value

getReadWriteLock

public java.util.concurrent.locks.ReadWriteLock getReadWriteLock()
Description copied from interface: VariableService
Lock for use in atomic writes to the variable space.

Specified by:
getReadWriteLock in interface VariableService
Returns:
read write lock for external coordinated write

commit

public void commit()
Description copied from interface: VariableService
Commits the variable outstanding changes.

Specified by:
commit in interface VariableService

rollback

public void rollback()
Description copied from interface: VariableService
Rolls back the variable outstanding changes.

Specified by:
rollback in interface VariableService

checkAndWrite

public void checkAndWrite(java.lang.String variableName,
                          int agentInstanceId,
                          java.lang.Object newValue)
                   throws VariableValueException
Description copied from interface: VariableService
Check type of the value supplied and writes the new variable value.

Must be followed by either a commit or rollback.

Specified by:
checkAndWrite in interface VariableService
newValue - the new value
Throws:
VariableValueException

toString

public java.lang.String toString()
Overrides:
toString in class java.lang.Object

getVariableReadersNonCP

public java.util.Map<java.lang.String,VariableReader> getVariableReadersNonCP()
Description copied from interface: VariableService
Returns a map of variable name and reader, for thread-safe iteration.

Specified by:
getVariableReadersNonCP in interface VariableService
Returns:
variable names and readers

getReadersPerCP

public java.util.concurrent.ConcurrentHashMap<java.lang.Integer,VariableReader> getReadersPerCP(java.lang.String variableName)
Specified by:
getReadersPerCP in interface VariableService

© 2006-2015 EsperTech Inc.
All rights reserved.
Visit us at espertech.com