Class VariableManagementServiceImpl
- All Implemented Interfaces:
VariableManagementService
Consider a statement as follows: select * from MyEvent as A where A.val > var1 and A.val2 > var1 and A.val3 > var2
Upon statement execution we need to guarantee that the same atomic value for all variables is applied for all variable reads (by expressions typically) within the statement.
Designed to support:
- lock-less read of the current and prior version, locked reads for older versions
- atomicity by keeping multiple versions for each variable and a threadlocal that receives the current version each call
- one write lock for all variables (required to coordinate with single global version number), however writes are very fast (entry to collection plus increment an int) and therefore blocking should not be an issue
As an alternative to a version-based design, a read-lock for the variable space could also be used, with the following disadvantages: The write lock may just not be granted unless fair locks are used which are more expensive; And a read-lock is more expensive to acquire for multiple CPUs; A thread-local is still need to deal with "set var1=3, var2=var1+1" assignments where the new uncommitted value must be visible in the local evaluation.
Every new write to a variable creates a new version. Thus when reading variables, readers can ignore newer versions and a read lock is not required in most circumstances.
This algorithm works as follows:
A thread processing an event into the runtimevia sendEvent() calls the "setLocalVersion" method once before processing a statement that has variables. This places into a threadlocal variable the current version number, say version 570.
A statement that reads a variable has an variable node that has a VariableReader handle obtained during validation (example).
The VariableReader takes the version from the threadlocal (570) and compares the version number with the version numbers held for the variable. If the current version is same or lower (520, as old or older) then the threadlocal version, then use the current value. If the current version is higher (571, newer) then the threadlocal version, then go to the prior value. Use the prior value until a version is found that as old or older then the threadlocal version.
If no version can be found that is old enough, output a warning and return the newest version. This should not happen, unless a thread is executing for very long within a single statement such that lifetime-old-version time speriod passed before the thread asks for variable values.
As version numbers are counted up they may reach a boundary. Any write transaction after the boundary is reached performs a roll-over. In a roll-over, all variables version lists are newly created and any existing threads that read versions go against a (old) high-collection, while new threads reading the reset version go against a new low-collection.
The class also allows an optional state handler to be plugged in to handle persistence for variable state. The state handler gets invoked when a variable changes value, and when a variable gets created to obtain the current value from persistence, if any.
-
Field Summary
Modifier and TypeFieldDescriptionprotected static final int
Applicable for each variable if more then the number of versions accumulated, check timestamps to determine if a version can be expired.protected static final int
Sets the boundary above which a reader considers the high-version list of variable values. -
Constructor Summary
ModifierConstructorDescriptionprotected
VariableManagementServiceImpl
(int startVersion, long millisecondLifetimeOldVersions, TimeProvider timeProvider, EventBeanTypedEventFactory eventBeanTypedEventFactory, VariableStateNonConstHandler optionalStateHandler) Ctor.VariableManagementServiceImpl
(long millisecondLifetimeOldVersions, TimeProvider timeProvider, EventBeanTypedEventFactory eventBeanTypedEventFactory, VariableStateNonConstHandler optionalStateHandler) Ctor. -
Method Summary
Modifier and TypeMethodDescriptionvoid
addVariable
(String deploymentId, VariableMetaData metaData, String optionalDeploymentIdContext, DataInputOutputSerde optionalSerde) void
allocateVariableState
(String deploymentId, String variableName, int agentInstanceId, boolean recovery, NullableObject<Object> initialValue, EventBeanTypedEventFactory eventBeanTypedEventFactory) void
checkAndWrite
(String deploymentId, String variableName, int agentInstanceId, Object newValue) Check type of the value supplied and writes the new variable value.void
commit()
Commits the variable outstanding changes.void
deallocateVariableState
(String deploymentId, String variableName, int agentInstanceId) void
destroy()
Returns a reader that provides access to variable values.getReadersPerCP
(String deploymentId, String variableName) Lock for use in atomic writes to the variable space.getVariableMetaData
(String deploymentId, String variableName) void
registerCallback
(String deploymentId, String variableName, int agentInstanceId, VariableChangeCallback variableChangeCallback) Registers a callback invoked when the variable is written with a new value.void
removeVariableIfFound
(String deploymentId, String variableName) Removes a variable.void
rollback()
Rolls back the variable outstanding changes.void
Sets the variable version that subsequent reads consider.void
traverseVariables
(BiConsumer<String, Variable> consumer) void
unregisterCallback
(String deploymentId, String variableName, int agentInstanceId, VariableChangeCallback variableChangeCallback) Removes a callback.void
Writes a new variable value.
-
Field Details
-
ROLLOVER_READER_BOUNDARY
protected static final int ROLLOVER_READER_BOUNDARYSets the boundary above which a reader considers the high-version list of variable values. For use in roll-over when the current version number overflows the ROLLOVER_WRITER_BOUNDARY.- See Also:
-
HIGH_WATERMARK_VERSIONS
protected static final int HIGH_WATERMARK_VERSIONSApplicable for each variable if more then the number of versions accumulated, check timestamps to determine if a version can be expired.- See Also:
-
-
Constructor Details
-
VariableManagementServiceImpl
public VariableManagementServiceImpl(long millisecondLifetimeOldVersions, TimeProvider timeProvider, EventBeanTypedEventFactory eventBeanTypedEventFactory, VariableStateNonConstHandler optionalStateHandler) Ctor.- Parameters:
millisecondLifetimeOldVersions
- number of milliseconds a version may hang around before expirytimeProvider
- provides the current timeoptionalStateHandler
- a optional plug-in that may store variable state and retrieve state upon creationeventBeanTypedEventFactory
- event adapters
-
VariableManagementServiceImpl
protected VariableManagementServiceImpl(int startVersion, long millisecondLifetimeOldVersions, TimeProvider timeProvider, EventBeanTypedEventFactory eventBeanTypedEventFactory, VariableStateNonConstHandler optionalStateHandler) Ctor.- Parameters:
startVersion
- the first version number to start frommillisecondLifetimeOldVersions
- number of milliseconds a version may hang around before expirytimeProvider
- provides the current timeoptionalStateHandler
- a optional plug-in that may store variable state and retrieve state upon creationeventBeanTypedEventFactory
- for finding event types
-
-
Method Details
-
destroy
public void destroy()- Specified by:
destroy
in interfaceVariableManagementService
-
removeVariableIfFound
Description copied from interface:VariableManagementService
Removes a variable.- Specified by:
removeVariableIfFound
in interfaceVariableManagementService
- Parameters:
deploymentId
- deployment idvariableName
- to remove
-
setLocalVersion
public void setLocalVersion()Description copied from interface:VariableManagementService
Sets the variable version that subsequent reads consider.- Specified by:
setLocalVersion
in interfaceVariableManagementService
-
registerCallback
public void registerCallback(String deploymentId, String variableName, int agentInstanceId, VariableChangeCallback variableChangeCallback) Description copied from interface:VariableManagementService
Registers a callback invoked when the variable is written with a new value.- Specified by:
registerCallback
in interfaceVariableManagementService
- Parameters:
deploymentId
- deployment idvariableName
- variable nameagentInstanceId
- agent instance idvariableChangeCallback
- a callback
-
unregisterCallback
public void unregisterCallback(String deploymentId, String variableName, int agentInstanceId, VariableChangeCallback variableChangeCallback) Description copied from interface:VariableManagementService
Removes a callback.- Specified by:
unregisterCallback
in interfaceVariableManagementService
- Parameters:
deploymentId
- deployment idvariableName
- variable nameagentInstanceId
- agent instance idvariableChangeCallback
- a callback
-
addVariable
public void addVariable(String deploymentId, VariableMetaData metaData, String optionalDeploymentIdContext, DataInputOutputSerde optionalSerde) - Specified by:
addVariable
in interfaceVariableManagementService
-
allocateVariableState
public void allocateVariableState(String deploymentId, String variableName, int agentInstanceId, boolean recovery, NullableObject<Object> initialValue, EventBeanTypedEventFactory eventBeanTypedEventFactory) - Specified by:
allocateVariableState
in interfaceVariableManagementService
-
deallocateVariableState
- Specified by:
deallocateVariableState
in interfaceVariableManagementService
-
getVariableMetaData
- Specified by:
getVariableMetaData
in interfaceVariableManagementService
-
getReader
public VariableReader getReader(String deploymentId, String variableName, int agentInstanceIdAccessor) Description copied from interface:VariableManagementService
Returns a reader that provides access to variable values. The reader considers the version currently set via setLocalVersion.- Specified by:
getReader
in interfaceVariableManagementService
- Parameters:
deploymentId
- deployment idvariableName
- the variable that the reader should readagentInstanceIdAccessor
- agent instance id of accessor- Returns:
- reader
-
write
Description copied from interface:VariableManagementService
Writes a new variable value.Must be followed by either a commit or rollback.
- Specified by:
write
in interfaceVariableManagementService
- Parameters:
variableNumber
- the index number of the variable to write (from VariableReader)agentInstanceId
- agent instance idnewValue
- the new value
-
getReadWriteLock
Description copied from interface:VariableManagementService
Lock for use in atomic writes to the variable space.- Specified by:
getReadWriteLock
in interfaceVariableManagementService
- Returns:
- read write lock for external coordinated write
-
commit
public void commit()Description copied from interface:VariableManagementService
Commits the variable outstanding changes.- Specified by:
commit
in interfaceVariableManagementService
-
rollback
public void rollback()Description copied from interface:VariableManagementService
Rolls back the variable outstanding changes.- Specified by:
rollback
in interfaceVariableManagementService
-
checkAndWrite
public void checkAndWrite(String deploymentId, String variableName, int agentInstanceId, Object newValue) throws VariableValueException Description copied from interface:VariableManagementService
Check type of the value supplied and writes the new variable value.Must be followed by either a commit or rollback.
- Specified by:
checkAndWrite
in interfaceVariableManagementService
- Parameters:
deploymentId
- deployment idvariableName
- variable nameagentInstanceId
- agent instance idnewValue
- the new value- Throws:
VariableValueException
-
getReadersPerCP
public ConcurrentHashMap<Integer,VariableReader> getReadersPerCP(String deploymentId, String variableName) - Specified by:
getReadersPerCP
in interfaceVariableManagementService
-
getVariableReadersNonCP
- Specified by:
getVariableReadersNonCP
in interfaceVariableManagementService
-
getOptionalStateHandler
- Specified by:
getOptionalStateHandler
in interfaceVariableManagementService
-
getDeploymentsWithVariables
- Specified by:
getDeploymentsWithVariables
in interfaceVariableManagementService
-
traverseVariables
- Specified by:
traverseVariables
in interfaceVariableManagementService
-