Package com.alibaba.graphscope.graph
Interface AggregatorManager
-
- All Known Implementing Classes:
AggregatorManagerImpl
,AggregatorManagerNettyImpl
public interface AggregatorManager
Providing management for creating and using aggregators.
-
-
Method Summary
All Methods Instance Methods Abstract Methods Modifier and Type Method Description void
acceptNettyMessage(NettyMessage aggregatorMessage)
Accept a message from other worker, aggregate to me.<A extends org.apache.hadoop.io.Writable>
voidaggregate(String name, A value)
Add a new value.void
broadcast(String name, org.apache.hadoop.io.Writable value)
Broadcast given value to all workers for next computation.<A extends org.apache.hadoop.io.Writable>
AgetAggregatedValue(String name)
Return current aggregated value.int
getNumWorkers()
<R extends org.apache.hadoop.io.Writable>
RgetReduced(String name)
Get reduced value from previous worker computation.int
getWorkerId()
void
init(FFICommunicator communicator)
Init the manager with Grape::Communicator, the actual logic depends on implementation.void
postSuperstep()
Synchronize aggregator values between workers after superstep.void
preSuperstep()
void
reduce(String name, Object value)
<A extends org.apache.hadoop.io.Writable>
booleanregisterAggregator(String name, Class<? extends org.apache.giraph.aggregators.Aggregator<A>> aggregatorClass)
Register an aggregator with a unique name<A extends org.apache.hadoop.io.Writable>
booleanregisterPersistentAggregator(String name, Class<? extends org.apache.giraph.aggregators.Aggregator<A>> aggregatorClass)
Register a persistent aggregator with a unique name.<S,R extends org.apache.hadoop.io.Writable>
voidregisterReducer(String name, org.apache.giraph.reducers.ReduceOperation<S,R> reduceOp)
Register reducer to be reduced in the next worker computation, using given name and operations.<S,R extends org.apache.hadoop.io.Writable>
voidregisterReducer(String name, org.apache.giraph.reducers.ReduceOperation<S,R> reduceOp, R globalInitialValue)
Register reducer to be reduced in the next worker computation, using given name and operations, starting globally from globalInitialValue.<A extends org.apache.hadoop.io.Writable>
voidsetAggregatedValue(String name, A value)
Set aggregated value.
-
-
-
Method Detail
-
getWorkerId
int getWorkerId()
-
getNumWorkers
int getNumWorkers()
-
registerAggregator
<A extends org.apache.hadoop.io.Writable> boolean registerAggregator(String name, Class<? extends org.apache.giraph.aggregators.Aggregator<A>> aggregatorClass) throws InstantiationException, IllegalAccessException
Register an aggregator with a unique name- Type Parameters:
A
- type param- Parameters:
name
- aggregator nameaggregatorClass
- the class- Throws:
InstantiationException
IllegalAccessException
-
registerPersistentAggregator
<A extends org.apache.hadoop.io.Writable> boolean registerPersistentAggregator(String name, Class<? extends org.apache.giraph.aggregators.Aggregator<A>> aggregatorClass) throws InstantiationException, IllegalAccessException
Register a persistent aggregator with a unique name.- Type Parameters:
A
- type param- Parameters:
name
- aggregator nameaggregatorClass
- the implementation class- Throws:
InstantiationException
IllegalAccessException
-
getAggregatedValue
<A extends org.apache.hadoop.io.Writable> A getAggregatedValue(String name)
Return current aggregated value. Needs to be initialized if aggregate or setAggregatedValue have not been called before.- Parameters:
name
- name for the aggregator- Returns:
- Aggregated
-
setAggregatedValue
<A extends org.apache.hadoop.io.Writable> void setAggregatedValue(String name, A value)
Set aggregated value. Can be used for initialization or reset.- Parameters:
name
- name for the aggregatorvalue
- Value to be set.
-
aggregate
<A extends org.apache.hadoop.io.Writable> void aggregate(String name, A value)
Add a new value. Needs to be commutative and associative- Parameters:
name
- a unique name refer to an aggregatorvalue
- Value to be aggregated.
-
registerReducer
<S,R extends org.apache.hadoop.io.Writable> void registerReducer(String name, org.apache.giraph.reducers.ReduceOperation<S,R> reduceOp)
Register reducer to be reduced in the next worker computation, using given name and operations.- Type Parameters:
S
- Single value typeR
- Reduced value type- Parameters:
name
- Name of the reducerreduceOp
- Reduce operations
-
registerReducer
<S,R extends org.apache.hadoop.io.Writable> void registerReducer(String name, org.apache.giraph.reducers.ReduceOperation<S,R> reduceOp, R globalInitialValue)
Register reducer to be reduced in the next worker computation, using given name and operations, starting globally from globalInitialValue. (globalInitialValue is reduced only once, each worker will still start from neutral initial value)- Type Parameters:
S
- Single value typeR
- Reduced value type- Parameters:
name
- Name of the reducerreduceOp
- Reduce operationsglobalInitialValue
- Global initial value
-
getReduced
<R extends org.apache.hadoop.io.Writable> R getReduced(String name)
Get reduced value from previous worker computation.- Type Parameters:
R
- Reduced value type- Parameters:
name
- Name of the reducer- Returns:
- Reduced value
-
broadcast
void broadcast(String name, org.apache.hadoop.io.Writable value)
Broadcast given value to all workers for next computation.- Parameters:
name
- Name of the broadcast objectvalue
- Value
-
preSuperstep
void preSuperstep()
-
postSuperstep
void postSuperstep()
Synchronize aggregator values between workers after superstep.
-
init
void init(FFICommunicator communicator)
Init the manager with Grape::Communicator, the actual logic depends on implementation.- Parameters:
communicator
- communicator.
-
acceptNettyMessage
void acceptNettyMessage(NettyMessage aggregatorMessage)
Accept a message from other worker, aggregate to me.- Parameters:
aggregatorMessage
- received message.
-
-