@InterfaceAudience.Private public abstract class Procedure<TEnvironment> extends Object implements Comparable<Procedure<TEnvironment>>
ProcedureExecutor
instance. They are submitted and then the
ProcedureExecutor keeps calling execute(Object)
until the Procedure is done. Execute may
be called multiple times in the case of failure or a restart, so code must be idempotent. The
return from an execute call is either: null to indicate we are done; ourself if there is more to
do; or, a set of sub-procedures that need to be run to completion before the framework resumes
our execution.
The ProcedureExecutor keeps its notion of Procedure State in the Procedure itself; e.g. it stamps
the Procedure as INITIALIZING, RUNNABLE, SUCCESS, etc. Here are some of the States defined in the
ProcedureState enum from protos:
isFailed()
A procedure has executed at least once and has failed. The procedure may
or may not have rolled back yet. Any procedure in FAILED state will be eventually moved to
ROLLEDBACK state.isSuccess()
A procedure is completed successfully without exception.isFinished()
As a procedure in FAILED state will be tried forever for rollback, only
condition when scheduler/ executor will drop procedure from further processing is when procedure
state is ROLLEDBACK or isSuccess() returns true. This is a terminal state of the procedure.isWaiting()
- Procedure is in one of the two waiting states
(ProcedureProtos.ProcedureState.WAITING
, ProcedureProtos.ProcedureState.WAITING_TIMEOUT
).hasLock()
. The lock implementation is up to the implementor. If an entity needs to be
locked for the life of a procedure -- not just the calls to execute -- then implementations
should say so with the holdLock(Object)
method.
And since we need to restore the lock when restarting to keep the logic correct(HBASE-20846), the
implementation is a bit tricky so we add some comments hrre about it.
hasLock()
method final, and add a locked
field in Procedure to record
whether we have the lock. We will set it to true
in
doAcquireLock(Object, ProcedureStore)
and to false
in
doReleaseLock(Object, ProcedureStore)
. The sub classes do not need to manage it any
more.hasLock()
. And when loading, there is a new field in Procedure
called lockedWhenLoading
. We will set it to true
if the locked field in proto
message is true
.locked
field directly to true
by calling
doAcquireLock(Object, ProcedureStore)
is that, during initialization, most procedures
need to wait until master is initialized. So the solution here is that, we introduced a new
method called waitInitialized(Object)
in Procedure, and move the wait master initialized
related code from acquireLock(Object)
to this method. And we added a restoreLock method
to Procedure, if lockedWhenLoading
is true
, we will call the
acquireLock(Object)
to get the lock, but do not set locked
to true. And later
when we call doAcquireLock(Object, ProcedureStore)
and pass the
waitInitialized(Object)
check, we will test lockedWhenLoading
, if it is
true
, when we just set the locked
field to true and return, without actually
calling the acquireLock(Object)
method since we have already called it once.setTimeout(int)
}, and
setTimeoutFailure(Object)
. See TestProcedureEvents and the TestTimeoutEventProcedure
class for an example usage.
There are hooks for collecting metrics on submit of the procedure and on finish. See
updateMetricsOnSubmit(Object)
and updateMetricsOnFinish(Object, long, boolean)
.Modifier and Type | Class and Description |
---|---|
static class |
Procedure.LockState |
Modifier and Type | Field and Description |
---|---|
static long |
NO_PROC_ID |
protected static int |
NO_TIMEOUT |
Constructor and Description |
---|
Procedure() |
Modifier and Type | Method and Description |
---|---|
protected abstract boolean |
abort(TEnvironment env)
The abort() call is asynchronous and each procedure must decide how to deal
with it, if they want to be abortable.
|
protected Procedure.LockState |
acquireLock(TEnvironment env)
The user should override this method if they need a lock on an Entity.
|
protected void |
addStackIndex(int index)
Called by the RootProcedureState on procedure execution.
|
protected void |
afterReplay(TEnvironment env)
Called when the procedure is ready to be added to the queue after
the loading/replay operation.
|
protected void |
beforeReplay(TEnvironment env)
Called when the procedure is loaded for replay.
|
protected void |
bypass(TEnvironment env)
Set the bypass to true.
|
int |
compareTo(Procedure<TEnvironment> other) |
protected void |
completionCleanup(TEnvironment env)
Called when the procedure is marked as completed (success or rollback).
|
protected abstract void |
deserializeStateData(ProcedureStateSerializer serializer)
Called on store load to allow the user to decode the previously serialized
state.
|
protected Procedure<TEnvironment>[] |
doExecute(TEnvironment env)
Internal method called by the ProcedureExecutor that starts the user-level code execute().
|
protected void |
doRollback(TEnvironment env)
Internal method called by the ProcedureExecutor that starts the user-level code rollback().
|
long |
elapsedTime() |
protected abstract Procedure<TEnvironment>[] |
execute(TEnvironment env)
The main code of the procedure.
|
protected int |
getChildrenLatch() |
RemoteProcedureException |
getException() |
long |
getLastUpdate() |
NonceKey |
getNonceKey() |
String |
getOwner() |
long |
getParentProcId() |
protected ProcedureMetrics |
getProcedureMetrics(TEnvironment env)
Override this method to provide procedure specific counters for submitted count, failed
count and time histogram.
|
long |
getProcId() |
static long |
getProcIdHashCode(long procId)
Get an hashcode for the specified Procedure ID
|
String |
getProcName() |
byte[] |
getResult() |
protected static <T> Long |
getRootProcedureId(Map<Long,Procedure<T>> procedures,
Procedure<T> proc)
Helper to lookup the root Procedure ID given a specified procedure.
|
long |
getRootProcId() |
protected int[] |
getStackIndexes() |
ProcedureProtos.ProcedureState |
getState() |
long |
getSubmittedTime() |
int |
getTimeout() |
protected long |
getTimeoutTimestamp()
Timeout of the next timeout.
|
protected boolean |
hasChildren() |
boolean |
hasException() |
boolean |
hasLock()
This is used in conjunction with
holdLock(Object) . |
boolean |
hasOwner() |
boolean |
hasParent() |
boolean |
hasTimeout() |
static boolean |
haveSameParent(Procedure<?> a,
Procedure<?> b) |
protected boolean |
holdLock(TEnvironment env)
Used to keep the procedure lock even when the procedure is yielding or suspended.
|
protected void |
incChildrenLatch()
Called by the ProcedureExecutor on procedure-load to restore the latch state
|
boolean |
isBypass() |
boolean |
isFailed() |
boolean |
isFinished() |
boolean |
isInitializing() |
boolean |
isLockedWhenLoading()
Can only be called when restarting, before the procedure actually being executed, as after we
actually call the
doAcquireLock(Object, ProcedureStore) method, we will reset
lockedWhenLoading to false. |
boolean |
isRunnable() |
boolean |
isSuccess() |
boolean |
isWaiting() |
protected boolean |
isYieldAfterExecutionStep(TEnvironment env)
By default, the procedure framework/executor will try to run procedures start to finish.
|
protected void |
releaseLock(TEnvironment env)
The user should override this method, and release lock if necessary.
|
protected boolean |
removeStackIndex() |
protected abstract void |
rollback(TEnvironment env)
The code to undo what was done by the execute() code.
|
protected abstract void |
serializeStateData(ProcedureStateSerializer serializer)
The user-level code of the procedure may have some state to
persist (e.g.
|
protected void |
setAbortFailure(String source,
String msg) |
protected void |
setChildrenLatch(int numChildren)
Called by the ProcedureExecutor on procedure-load to restore the latch state
|
protected void |
setFailure(RemoteProcedureException exception) |
protected void |
setFailure(String source,
Throwable cause) |
protected void |
setLastUpdate(long lastUpdate)
Called on store load to initialize the Procedure internals after
the creation/deserialization.
|
protected void |
setNonceKey(NonceKey nonceKey)
Called by the ProcedureExecutor to set the value to the newly created procedure.
|
void |
setOwner(String owner) |
void |
setOwner(User owner) |
protected void |
setParentProcId(long parentProcId)
Called by the ProcedureExecutor to assign the parent to the newly created procedure.
|
protected void |
setProcId(long procId)
Called by the ProcedureExecutor to assign the ID to the newly created procedure.
|
protected void |
setResult(byte[] result)
The procedure may leave a "result" on completion.
|
protected void |
setRootProcId(long rootProcId) |
protected void |
setStackIndexes(List<Integer> stackIndexes)
Called on store load to initialize the Procedure internals after
the creation/deserialization.
|
protected void |
setState(ProcedureProtos.ProcedureState state) |
protected void |
setSubmittedTime(long submittedTime)
Called on store load to initialize the Procedure internals after
the creation/deserialization.
|
protected void |
setTimeout(int timeout) |
protected boolean |
setTimeoutFailure(TEnvironment env)
Called by the ProcedureExecutor when the timeout set by setTimeout() is expired.
|
protected boolean |
shouldWaitClientAck(TEnvironment env)
By default, the executor will keep the procedure result around util
the eviction TTL is expired.
|
protected void |
skipPersistence() |
String |
toString() |
protected String |
toStringClass() |
protected void |
toStringClassDetails(StringBuilder builder)
Extend the toString() information with the procedure details
e.g.
|
String |
toStringDetails()
Extend the toString() information with more procedure details
|
protected StringBuilder |
toStringSimpleSB()
Build the StringBuilder for the simple form of procedure string.
|
protected void |
toStringState(StringBuilder builder)
Called from
toString() when interpolating Procedure State. |
protected void |
updateMetricsOnFinish(TEnvironment env,
long runtime,
boolean success)
This function will be called just after procedure execution is finished.
|
protected void |
updateMetricsOnSubmit(TEnvironment env)
This function will be called just when procedure is submitted for execution.
|
protected void |
updateTimestamp()
Called by ProcedureExecutor after each time a procedure step is executed.
|
protected boolean |
waitInitialized(TEnvironment env)
The
doAcquireLock(Object, ProcedureStore) will be split into two steps, first, it will
call us to determine whether we need to wait for initialization, second, it will call
acquireLock(Object) to actually handle the lock for this procedure. |
protected boolean |
wasExecuted() |
public static final long NO_PROC_ID
protected static final int NO_TIMEOUT
public boolean isBypass()
protected void bypass(TEnvironment env)
ProcedureExecutor#bypassProcedure(long, long, boolean)
for now.
DO NOT use this method alone, since we can't just bypass one single procedure. We need to
bypass its ancestor too. If your Procedure has set state, it needs to undo it in here.env
- Current environment. May be null because of context; e.g. pretty-printing
procedure WALs where there is no 'environment' (and where Procedures that require
an 'environment' won't be run.protected final void skipPersistence()
protected abstract Procedure<TEnvironment>[] execute(TEnvironment env) throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException
env
- the environment passed to the ProcedureExecutorProcedureYieldException
- the procedure will be added back to the queue and retried later.InterruptedException
- the procedure will be added back to the queue and retried later.ProcedureSuspendedException
- Signal to the executor that Procedure has suspended itself and
has set itself up waiting for an external event to wake it back up again.protected abstract void rollback(TEnvironment env) throws IOException, InterruptedException
env
- the environment passed to the ProcedureExecutorIOException
- temporary failure, the rollback will retry laterInterruptedException
- the procedure will be added back to the queue and retried laterprotected abstract boolean abort(TEnvironment env)
NOTE: abort() is not like Thread.interrupt(). It is just a notification that allows the procedure implementor abort.
protected abstract void serializeStateData(ProcedureStateSerializer serializer) throws IOException
serializer
- stores the serializable stateIOException
protected abstract void deserializeStateData(ProcedureStateSerializer serializer) throws IOException
serializer
- contains the serialized stateIOException
protected boolean waitInitialized(TEnvironment env)
doAcquireLock(Object, ProcedureStore)
will be split into two steps, first, it will
call us to determine whether we need to wait for initialization, second, it will call
acquireLock(Object)
to actually handle the lock for this procedure.
This is because that when master restarts, we need to restore the lock state for all the
procedures to not break the semantic if holdLock(Object)
is true. But the
ProcedureExecutor
will be started before the master finish initialization(as it is part
of the initialization!), so we need to split the code into two steps, and when restore, we just
restore the lock part and ignore the waitInitialized part. Otherwise there will be dead lock.protected Procedure.LockState acquireLock(TEnvironment env)
execute(Object)
. It calls releaseLock(Object)
after the call to
execute.
If you need to hold the lock for the life of the Procedure -- i.e. you do not want any other
Procedure interfering while this Procedure is running, see holdLock(Object)
.
Example: in our Master we can execute request in parallel for different tables. We can create
t1 and create t2 and these creates can be executed at the same time. Anything else on t1/t2 is
queued waiting that specific table create to happen.
There are 3 LockState:
protected void releaseLock(TEnvironment env)
protected boolean holdLock(TEnvironment env)
public final boolean hasLock()
holdLock(Object)
. If holdLock(Object)
returns true, the procedure executor will call acquireLock() once and thereafter
not call releaseLock(Object)
until the Procedure is done (Normally, it calls
release/acquire around each invocation of execute(Object)
.holdLock(Object)
protected void beforeReplay(TEnvironment env)
protected void afterReplay(TEnvironment env)
protected void completionCleanup(TEnvironment env)
protected boolean isYieldAfterExecutionStep(TEnvironment env)
env
- the environment passed to the ProcedureExecutorprotected boolean shouldWaitClientAck(TEnvironment env)
env
- the environment passed to the ProcedureExecutorprotected ProcedureMetrics getProcedureMetrics(TEnvironment env)
env
- The environment passed to the procedure executorprotected void updateMetricsOnSubmit(TEnvironment env)
getProcedureMetrics(Object)
returns non-null
ProcedureMetrics
.protected void updateMetricsOnFinish(TEnvironment env, long runtime, boolean success)
getProcedureMetrics(Object)
returns
non-null ProcedureMetrics
, the default implementation adds runtime of a procedure to a
time histogram for successfully completed procedures. Increments failed counter for failed
procedures.
TODO: As any of the sub-procedures on failure rolls back all procedures in the stack, including
successfully finished siblings, this function may get called twice in certain cases for certain
procedures. Explore further if this can be called once.env
- The environment passed to the procedure executorruntime
- Runtime of the procedure in millisecondssuccess
- true if procedure is completed successfullyprotected StringBuilder toStringSimpleSB()
public String toStringDetails()
protected String toStringClass()
protected void toStringState(StringBuilder builder)
toString()
when interpolating Procedure
State. Allows decorating
generic Procedure State with Procedure particulars.builder
- Append current ProcedureProtos.ProcedureState
protected void toStringClassDetails(StringBuilder builder)
builder
- the string builder to use to append the proc specific informationpublic long getProcId()
public boolean hasParent()
public long getParentProcId()
public long getRootProcId()
public String getProcName()
public NonceKey getNonceKey()
public long getSubmittedTime()
public String getOwner()
public boolean hasOwner()
protected void setProcId(long procId)
protected void setParentProcId(long parentProcId)
protected void setRootProcId(long rootProcId)
protected void setNonceKey(NonceKey nonceKey)
public void setOwner(String owner)
public void setOwner(User owner)
protected void setSubmittedTime(long submittedTime)
protected void setTimeout(int timeout)
timeout
- timeout interval in msecpublic boolean hasTimeout()
public int getTimeout()
protected void setLastUpdate(long lastUpdate)
protected void updateTimestamp()
public long getLastUpdate()
protected long getTimeoutTimestamp()
public long elapsedTime()
public byte[] getResult()
protected void setResult(byte[] result)
result
- the serialized result that will be passed to the clientpublic boolean isLockedWhenLoading()
doAcquireLock(Object, ProcedureStore)
method, we will reset
lockedWhenLoading
to false.
Now it is only used in the ProcedureScheduler to determine whether we should put a Procedure in
front of a queue.public boolean isRunnable()
public boolean isInitializing()
public boolean isFailed()
public boolean isSuccess()
public boolean isFinished()
public boolean isWaiting()
protected void setState(ProcedureProtos.ProcedureState state)
public ProcedureProtos.ProcedureState getState()
protected void setFailure(RemoteProcedureException exception)
protected boolean setTimeoutFailure(TEnvironment env)
WAITING_TIMEOUT
by calling setState
method, and throw a
ProcedureSuspendedException
to halt the execution of the procedure, and do not forget a
call setTimeout(int)
method to set the timeout. And you should also override this
method to wake up the procedure, and also return false to tell the ProcedureExecutor that the
timeout event has been handled.public boolean hasException()
public RemoteProcedureException getException()
protected void setChildrenLatch(int numChildren)
protected void incChildrenLatch()
protected boolean hasChildren()
protected int getChildrenLatch()
protected void addStackIndex(int index)
protected boolean removeStackIndex()
protected void setStackIndexes(List<Integer> stackIndexes)
protected boolean wasExecuted()
protected int[] getStackIndexes()
protected Procedure<TEnvironment>[] doExecute(TEnvironment env) throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException
ProcedureSuspendedException
- This is used when procedure wants to halt processing and
skip out without changing states or releasing any locks held.ProcedureYieldException
InterruptedException
protected void doRollback(TEnvironment env) throws IOException, InterruptedException
IOException
InterruptedException
public int compareTo(Procedure<TEnvironment> other)
compareTo
in interface Comparable<Procedure<TEnvironment>>
public static long getProcIdHashCode(long procId)
protected static <T> Long getRootProcedureId(Map<Long,Procedure<T>> procedures, Procedure<T> proc)
Copyright © 2007–2019 Cloudera. All rights reserved.