public class VariableManagementServiceImpl extends Object implements VariableManagementService
Consider a statement as follows: select * from MyEvent as A where A.val > var1 and A.val2 > var1 and A.val3 > var2
Upon statement execution we need to guarantee that the same atomic value for all variables is applied for all variable reads (by expressions typically) within the statement.
Designed to support:
As an alternative to a version-based design, a read-lock for the variable space could also be used, with the following disadvantages: The write lock may just not be granted unless fair locks are used which are more expensive; And a read-lock is more expensive to acquire for multiple CPUs; A thread-local is still need to deal with "set var1=3, var2=var1+1" assignments where the new uncommitted value must be visible in the local evaluation.
Every new write to a variable creates a new version. Thus when reading variables, readers can ignore newer versions and a read lock is not required in most circumstances.
This algorithm works as follows:
A thread processing an event into the runtimevia sendEvent() calls the "setLocalVersion" method once before processing a statement that has variables. This places into a threadlocal variable the current version number, say version 570.
A statement that reads a variable has an variable node that has a VariableReader handle obtained during validation (example).
The VariableReader takes the version from the threadlocal (570) and compares the version number with the version numbers held for the variable. If the current version is same or lower (520, as old or older) then the threadlocal version, then use the current value. If the current version is higher (571, newer) then the threadlocal version, then go to the prior value. Use the prior value until a version is found that as old or older then the threadlocal version.
If no version can be found that is old enough, output a warning and return the newest version. This should not happen, unless a thread is executing for very long within a single statement such that lifetime-old-version time speriod passed before the thread asks for variable values.
As version numbers are counted up they may reach a boundary. Any write transaction after the boundary is reached performs a roll-over. In a roll-over, all variables version lists are newly created and any existing threads that read versions go against a (old) high-collection, while new threads reading the reset version go against a new low-collection.
The class also allows an optional state handler to be plugged in to handle persistence for variable state. The state handler gets invoked when a variable changes value, and when a variable gets created to obtain the current value from persistence, if any.
Modifier and Type | Field and Description |
---|---|
protected static int |
HIGH_WATERMARK_VERSIONS
Applicable for each variable if more then the number of versions accumulated, check
timestamps to determine if a version can be expired.
|
protected static int |
ROLLOVER_READER_BOUNDARY
Sets the boundary above which a reader considers the high-version list of variable values.
|
Modifier | Constructor and Description |
---|---|
protected |
VariableManagementServiceImpl(int startVersion,
long millisecondLifetimeOldVersions,
TimeProvider timeProvider,
EventBeanTypedEventFactory eventBeanTypedEventFactory,
VariableStateNonConstHandler optionalStateHandler)
Ctor.
|
|
VariableManagementServiceImpl(long millisecondLifetimeOldVersions,
TimeProvider timeProvider,
EventBeanTypedEventFactory eventBeanTypedEventFactory,
VariableStateNonConstHandler optionalStateHandler)
Ctor.
|
Modifier and Type | Method and Description |
---|---|
void |
addVariable(String deploymentId,
VariableMetaData metaData,
String optionalDeploymentIdContext) |
void |
allocateVariableState(String deploymentId,
String variableName,
int agentInstanceId,
boolean recovery,
NullableObject<Object> initialValue,
EventBeanTypedEventFactory eventBeanTypedEventFactory) |
void |
checkAndWrite(String deploymentId,
String variableName,
int agentInstanceId,
Object newValue)
Check type of the value supplied and writes the new variable value.
|
void |
commit()
Commits the variable outstanding changes.
|
void |
deallocateVariableState(String deploymentId,
String variableName,
int agentInstanceId) |
void |
destroy() |
Map<String,VariableDeployment> |
getDeploymentsWithVariables() |
VariableStateNonConstHandler |
getOptionalStateHandler() |
VariableReader |
getReader(String deploymentId,
String variableName,
int agentInstanceIdAccessor)
Returns a reader that provides access to variable values.
|
ConcurrentHashMap<Integer,VariableReader> |
getReadersPerCP(String deploymentId,
String variableName) |
ReadWriteLock |
getReadWriteLock()
Lock for use in atomic writes to the variable space.
|
Variable |
getVariableMetaData(String deploymentId,
String variableName) |
Map<DeploymentIdNamePair,VariableReader> |
getVariableReadersNonCP() |
void |
registerCallback(String deploymentId,
String variableName,
int agentInstanceId,
VariableChangeCallback variableChangeCallback)
Registers a callback invoked when the variable is written with a new value.
|
void |
removeVariableIfFound(String deploymentId,
String variableName)
Removes a variable.
|
void |
rollback()
Rolls back the variable outstanding changes.
|
void |
setLocalVersion()
Sets the variable version that subsequent reads consider.
|
void |
traverseVariables(BiConsumer<String,Variable> consumer) |
void |
unregisterCallback(String deploymentId,
String variableName,
int agentInstanceId,
VariableChangeCallback variableChangeCallback)
Removes a callback.
|
void |
write(int variableNumber,
int agentInstanceId,
Object newValue)
Writes a new variable value.
|
protected static final int ROLLOVER_READER_BOUNDARY
protected static final int HIGH_WATERMARK_VERSIONS
public VariableManagementServiceImpl(long millisecondLifetimeOldVersions, TimeProvider timeProvider, EventBeanTypedEventFactory eventBeanTypedEventFactory, VariableStateNonConstHandler optionalStateHandler)
millisecondLifetimeOldVersions
- number of milliseconds a version may hang around before expirytimeProvider
- provides the current timeoptionalStateHandler
- a optional plug-in that may store variable state and retrieve state upon creationeventBeanTypedEventFactory
- event adaptersprotected VariableManagementServiceImpl(int startVersion, long millisecondLifetimeOldVersions, TimeProvider timeProvider, EventBeanTypedEventFactory eventBeanTypedEventFactory, VariableStateNonConstHandler optionalStateHandler)
startVersion
- the first version number to start frommillisecondLifetimeOldVersions
- number of milliseconds a version may hang around before expirytimeProvider
- provides the current timeoptionalStateHandler
- a optional plug-in that may store variable state and retrieve state upon creationeventBeanTypedEventFactory
- for finding event typespublic void destroy()
destroy
in interface VariableManagementService
public void removeVariableIfFound(String deploymentId, String variableName)
VariableManagementService
removeVariableIfFound
in interface VariableManagementService
deploymentId
- deployment idvariableName
- to removepublic void setLocalVersion()
VariableManagementService
setLocalVersion
in interface VariableManagementService
public void registerCallback(String deploymentId, String variableName, int agentInstanceId, VariableChangeCallback variableChangeCallback)
VariableManagementService
registerCallback
in interface VariableManagementService
deploymentId
- deployment idvariableName
- variable nameagentInstanceId
- agent instance idvariableChangeCallback
- a callbackpublic void unregisterCallback(String deploymentId, String variableName, int agentInstanceId, VariableChangeCallback variableChangeCallback)
VariableManagementService
unregisterCallback
in interface VariableManagementService
deploymentId
- deployment idvariableName
- variable nameagentInstanceId
- agent instance idvariableChangeCallback
- a callbackpublic void addVariable(String deploymentId, VariableMetaData metaData, String optionalDeploymentIdContext)
addVariable
in interface VariableManagementService
public void allocateVariableState(String deploymentId, String variableName, int agentInstanceId, boolean recovery, NullableObject<Object> initialValue, EventBeanTypedEventFactory eventBeanTypedEventFactory)
allocateVariableState
in interface VariableManagementService
public void deallocateVariableState(String deploymentId, String variableName, int agentInstanceId)
deallocateVariableState
in interface VariableManagementService
public Variable getVariableMetaData(String deploymentId, String variableName)
getVariableMetaData
in interface VariableManagementService
public VariableReader getReader(String deploymentId, String variableName, int agentInstanceIdAccessor)
VariableManagementService
getReader
in interface VariableManagementService
deploymentId
- deployment idvariableName
- the variable that the reader should readagentInstanceIdAccessor
- agent instance id of accessorpublic void write(int variableNumber, int agentInstanceId, Object newValue)
VariableManagementService
Must be followed by either a commit or rollback.
write
in interface VariableManagementService
variableNumber
- the index number of the variable to write (from VariableReader)agentInstanceId
- agent instance idnewValue
- the new valuepublic ReadWriteLock getReadWriteLock()
VariableManagementService
getReadWriteLock
in interface VariableManagementService
public void commit()
VariableManagementService
commit
in interface VariableManagementService
public void rollback()
VariableManagementService
rollback
in interface VariableManagementService
public void checkAndWrite(String deploymentId, String variableName, int agentInstanceId, Object newValue) throws VariableValueException
VariableManagementService
Must be followed by either a commit or rollback.
checkAndWrite
in interface VariableManagementService
deploymentId
- deployment idvariableName
- variable nameagentInstanceId
- agent instance idnewValue
- the new valueVariableValueException
public ConcurrentHashMap<Integer,VariableReader> getReadersPerCP(String deploymentId, String variableName)
getReadersPerCP
in interface VariableManagementService
public Map<DeploymentIdNamePair,VariableReader> getVariableReadersNonCP()
getVariableReadersNonCP
in interface VariableManagementService
public VariableStateNonConstHandler getOptionalStateHandler()
getOptionalStateHandler
in interface VariableManagementService
public Map<String,VariableDeployment> getDeploymentsWithVariables()
getDeploymentsWithVariables
in interface VariableManagementService
public void traverseVariables(BiConsumer<String,Variable> consumer)
traverseVariables
in interface VariableManagementService
Copyright © 2005–2018. All rights reserved.