@InterfaceAudience.Private public class ProcedureExecutor<TEnvironment> extends Object
Modifier and Type | Class and Description |
---|---|
static interface |
ProcedureExecutor.ProcedureExecutorListener |
static class |
ProcedureExecutor.Testing
Class with parameters describing how to fail/die when in testing-context.
|
Modifier and Type | Field and Description |
---|---|
static String |
CHECK_OWNER_SET_CONF_KEY |
static String |
EVICT_ACKED_TTL_CONF_KEY |
static String |
EVICT_TTL_CONF_KEY |
static String |
UPGRADE_TO_2_2 |
static String |
WORKER_KEEP_ALIVE_TIME_CONF_KEY |
Constructor and Description |
---|
ProcedureExecutor(org.apache.hadoop.conf.Configuration conf,
TEnvironment environment,
ProcedureStore store) |
ProcedureExecutor(org.apache.hadoop.conf.Configuration conf,
TEnvironment environment,
ProcedureStore store,
ProcedureScheduler scheduler) |
Modifier and Type | Method and Description |
---|---|
boolean |
abort(long procId)
Send an abort notification the specified procedure.
|
boolean |
abort(long procId,
boolean mayInterruptIfRunning)
Send an abort notification to the specified procedure.
|
void |
addChore(ProcedureInMemoryChore<TEnvironment> chore)
Add a chore procedure to the executor
|
List<Boolean> |
bypassProcedure(List<Long> pids,
long lockWait,
boolean force,
boolean recursive)
Bypass a procedure.
|
NonceKey |
createNonceKey(long nonceGroup,
long nonce)
Create a NoneKey from the specified nonceGroup and nonce.
|
int |
getActiveExecutorCount() |
Collection<Procedure<TEnvironment>> |
getActiveProceduresNoCopy()
Should only be used when starting up, where the procedure workers have not been started.
|
Set<Long> |
getActiveProcIds() |
int |
getCorePoolSize() |
TEnvironment |
getEnvironment() |
long |
getKeepAliveTime(TimeUnit timeUnit) |
protected long |
getLastProcId() |
<T extends Procedure<TEnvironment>> |
getProcedure(Class<T> clazz,
long procId) |
Procedure<TEnvironment> |
getProcedure(long procId) |
List<Procedure<TEnvironment>> |
getProcedures()
Get procedures.
|
Procedure<TEnvironment> |
getResult(long procId) |
Procedure<TEnvironment> |
getResultOrProcedure(long procId) |
ProcedureStore |
getStore() |
int |
getUrgentPoolSize() |
int |
getWorkerThreadCount() |
void |
init(int numThreads,
boolean abortOnCorruption)
Initialize the procedure executor, but do not start workers.
|
void |
init(int numThreads,
int urgentNumThreads,
boolean abortOnCorruption)
Initialize the procedure executor, but do not start workers.
|
boolean |
isFinished(long procId)
Return true if the procedure is finished.
|
boolean |
isProcedureOwner(long procId,
User user)
Check if the user is this procedure's owner
|
boolean |
isRunning() |
boolean |
isStarted(long procId)
Return true if the procedure is started.
|
void |
join() |
void |
refreshConfiguration(org.apache.hadoop.conf.Configuration conf) |
void |
registerListener(ProcedureExecutor.ProcedureExecutorListener listener) |
long |
registerNonce(NonceKey nonceKey)
Register a nonce for a procedure that is going to be submitted.
|
boolean |
removeChore(ProcedureInMemoryChore<TEnvironment> chore)
Remove a chore procedure from the executor
|
void |
removeResult(long procId)
Mark the specified completed procedure, as ready to remove.
|
void |
setFailureResultForNonce(NonceKey nonceKey,
String procName,
User procOwner,
IOException exception)
If the failure failed before submitting it, we may want to give back the
same error to the requests with the same nonceKey.
|
void |
setKeepAliveTime(long keepAliveTime,
TimeUnit timeUnit) |
void |
startWorkers()
Start the workers.
|
void |
stop() |
long |
submitProcedure(Procedure<TEnvironment> proc)
Add a new root-procedure to the executor.
|
long |
submitProcedure(Procedure<TEnvironment> proc,
NonceKey nonceKey)
Add a new root-procedure to the executor.
|
void |
submitProcedures(Procedure<TEnvironment>[] procs)
Add a set of new root-procedure to the executor.
|
boolean |
unregisterListener(ProcedureExecutor.ProcedureExecutorListener listener) |
void |
unregisterNonceIfProcedureWasNotSubmitted(NonceKey nonceKey)
Remove the NonceKey if the procedure was not submitted to the executor.
|
public static final String CHECK_OWNER_SET_CONF_KEY
public static final String WORKER_KEEP_ALIVE_TIME_CONF_KEY
public static final String UPGRADE_TO_2_2
public static final String EVICT_TTL_CONF_KEY
public static final String EVICT_ACKED_TTL_CONF_KEY
public ProcedureExecutor(org.apache.hadoop.conf.Configuration conf, TEnvironment environment, ProcedureStore store)
public ProcedureExecutor(org.apache.hadoop.conf.Configuration conf, TEnvironment environment, ProcedureStore store, ProcedureScheduler scheduler)
public void init(int numThreads, boolean abortOnCorruption) throws IOException
numThreads
- number of threads available for procedure execution.abortOnCorruption
- true if you want to abort your service in case a corrupted procedure
is found on replay. otherwise false.IOException
public void init(int numThreads, int urgentNumThreads, boolean abortOnCorruption) throws IOException
numThreads
- number of threads available for procedure execution.urgentNumThreads
- number of threads available for urgent procedure execution.abortOnCorruption
- true if you want to abort your service in case a corrupted procedure
is found on replay. otherwise false.IOException
public void startWorkers() throws IOException
IOException
public void stop()
public void join()
public void refreshConfiguration(org.apache.hadoop.conf.Configuration conf)
public boolean isRunning()
public int getWorkerThreadCount()
public int getCorePoolSize()
public int getUrgentPoolSize()
public int getActiveExecutorCount()
public TEnvironment getEnvironment()
public ProcedureStore getStore()
public void setKeepAliveTime(long keepAliveTime, TimeUnit timeUnit)
public long getKeepAliveTime(TimeUnit timeUnit)
public void addChore(ProcedureInMemoryChore<TEnvironment> chore)
chore
- the chore to addpublic boolean removeChore(ProcedureInMemoryChore<TEnvironment> chore)
chore
- the chore to removepublic NonceKey createNonceKey(long nonceGroup, long nonce)
nonceGroup
- nonce
- public long registerNonce(NonceKey nonceKey)
nonceKey
- A unique identifier for this operation from the client or process.public void unregisterNonceIfProcedureWasNotSubmitted(NonceKey nonceKey)
nonceKey
- A unique identifier for this operation from the client or process.public void setFailureResultForNonce(NonceKey nonceKey, String procName, User procOwner, IOException exception)
nonceKey
- A unique identifier for this operation from the client or processprocName
- name of the procedure, used to inform the userprocOwner
- name of the owner of the procedure, used to inform the userexception
- the failure to report to the userpublic long submitProcedure(Procedure<TEnvironment> proc)
proc
- the new procedure to execute.public List<Boolean> bypassProcedure(List<Long> pids, long lockWait, boolean force, boolean recursive) throws IOException
A procedure can be bypassed only if 1. The procedure is in state of RUNNABLE, WAITING, WAITING_TIMEOUT or it is a root procedure without any child. 2. No other worker thread is executing it 3. No child procedure has been submitted
If all the requirements are meet, the procedure and its ancestors will be bypassed and persisted to WAL.
If the procedure is in WAITING state, will set it to RUNNABLE add it to run queue. TODO: What about WAITING_TIMEOUT?
pids
- the procedure idlockWait
- time to wait lockforce
- if force set to true, we will bypass the procedure even if it is executing.
This is for procedures which can't break out during executing(due to bug, mostly)
In this case, bypassing the procedure is not enough, since it is already stuck
there. We need to restart the master after bypassing, and letting the problematic
procedure to execute wth bypass=true, so in that condition, the procedure can be
successfully bypassed.recursive
- We will do an expensive search for children of each pid. EXPENSIVE!IOException
- IOExceptionpublic long submitProcedure(Procedure<TEnvironment> proc, NonceKey nonceKey)
proc
- the new procedure to execute.nonceKey
- the registered unique identifier for this operation from the client or process.public void submitProcedures(Procedure<TEnvironment>[] procs)
procs
- the new procedures to execute.public boolean abort(long procId)
procId
- the procedure to abortpublic boolean abort(long procId, boolean mayInterruptIfRunning)
procId
- the procedure to abortmayInterruptIfRunning
- if the proc completed at least one step, should it be aborted?public Procedure<TEnvironment> getProcedure(long procId)
public <T extends Procedure<TEnvironment>> T getProcedure(Class<T> clazz, long procId)
public Procedure<TEnvironment> getResult(long procId)
public boolean isFinished(long procId)
procId
- the ID of the procedure to checkpublic boolean isStarted(long procId)
procId
- the ID of the procedure to checkpublic void removeResult(long procId)
procId
- the ID of the procedure to removepublic Procedure<TEnvironment> getResultOrProcedure(long procId)
public boolean isProcedureOwner(long procId, User user)
procId
- the target procedureuser
- the userpublic Collection<Procedure<TEnvironment>> getActiveProceduresNoCopy()
getProcedures()
below for most cases as
it will do a copy, and also include the finished procedures.public List<Procedure<TEnvironment>> getProcedures()
public void registerListener(ProcedureExecutor.ProcedureExecutorListener listener)
public boolean unregisterListener(ProcedureExecutor.ProcedureExecutorListener listener)
protected long getLastProcId()
Copyright © 2007–2019 Cloudera. All rights reserved.