import com.asterdata.ncluster.sqlmr.ApiVersion; import com.asterdata.ncluster.sqlmr.ArgumentClause; import com.asterdata.ncluster.sqlmr.InputInfo; import com.asterdata.ncluster.sqlmr.OutputInfo; import com.asterdata.ncluster.sqlmr.HelpInfo; import com.asterdata.ncluster.sqlmr.util.AsterVersion; import com.asterdata.ncluster.sqlmr.data.ColumnDefinition; import com.asterdata.ncluster.sqlmr.data.MultipleInputs; import com.asterdata.ncluster.sqlmr.data.PartitionDefinition; import com.asterdata.ncluster.sqlmr.data.RowEmitter; import com.asterdata.ncluster.sqlmr.data.RowHolder; import com.asterdata.ncluster.sqlmr.data.SqlType; import com.asterdata.ncluster.sqlmr.data.ValueHolder; import com.asterdata.ncluster.graph.data.VertexKey; import com.asterdata.ncluster.graph.data.VertexMessageEmitter; import com.asterdata.ncluster.graph.data.VertexMessageIterator; import com.asterdata.ncluster.graph.data.VertexState; import com.asterdata.ncluster.graph.data.Vertex; import com.asterdata.ncluster.graph.GraphAggregatorType; import com.asterdata.ncluster.graph.GraphFunction; import com.asterdata.ncluster.graph.GraphRuntimeContract; import com.asterdata.ncluster.graph.data.GraphGlobals; import com.asterdata.ncluster.util.ImmutableList; import java.util.ArrayList; import java.util.List; @HelpInfo( usageSyntax = "", shortDescription = "Graph function for demonstrating basic aggregator functionality.", longDescription = "", inputColumns = "", outputColumns = "", author = "", ) /** * This graph function shows how you can use the global aggregator API. * * There are three aggregates: max, sum (continuous) and sum (stepwise). * * The function updates the max aggregate only during initialization * with all of the vertex keys. This way, after initialization, * the aggregate should have the maximum vertex key. * * The two sum aggregates receive the same updates. During initialization, all the * vertices add -1 to both sums. During operateOnVertex, each vertex adds its * key value to the sum, twice. Also, the max vertex adds the iteration number * to the sum. * * For example, with three vertices (1, 2, 3), the stepwise sum would be: * * After initialization: -1 * 3 = -3 * After iteration 0: 2*(1+2+3) + 0 = 12 * After iteration 1: 2*(1+2+3) + 1 = 13 * After iteration 2: 2*(1+2+3) + 2 = 14 * * The continuous sum aggregate is just the sum of all the stepwise values * up to that point. */ public final class global_agg_example implements GraphFunction { ListSqlType vertexKeySchema_; public global_agg_example(GraphRuntimeContract contract) { // Set the vertex message schema to one integer column (dummy). ArrayListSqlType vertexMessageSchema = new ArrayListSqlType(); vertexMessageSchema.add(0, SqlType.integer()); contract.setVertexMessageSchema( ImmutableList.elementsOf(vertexMessageSchema)); // Set the vertex key schema to one integer column vertexKeySchema_ = new ArrayListSqlType(); vertexKeySchema_.add(0, SqlType.integer()); // Set the output schema to (result varchar). ArrayListColumnDefinition outputColumns = new ArrayListColumnDefinition(); outputColumns.add(new ColumnDefinition("result", SqlType.varchar())); contract.setOutputInfo(new OutputInfo(outputColumns)); // Input schema for all aggregators is integer. ArrayListColumnDefinition inputCols = new ArrayListColumnDefinition(); inputCols.add(new ColumnDefinition("input1", SqlType.integer())); InputInfo aggInput = new InputInfo(new InputInfo.Builder(inputCols)); // Register max aggregator (continuous). GraphRuntimeContract.AggregatorInfo ac = GraphRuntimeContract.AggregatorInfo.getBundledAggregator( "int_max", aggInput, ImmutableList.ArgumentClauseof() ); contract.registerAggregator("max", ac, GraphAggregatorType.CONTINUOUS); // Register aggregator for continuous sum. GraphRuntimeContract.AggregatorInfo ac2 = GraphRuntimeContract.AggregatorInfo.getBundledAggregator( "int_sum", aggInput, ImmutableList.ArgumentClauseof() ); contract.registerAggregator("sum_continuous", ac2, GraphAggregatorType.CONTINUOUS); // Register aggregator for stepwise sum. GraphRuntimeContract.AggregatorInfo ac3 = GraphRuntimeContract.AggregatorInfo.getBundledAggregator( "int_sum", aggInput, ImmutableList.ArgumentClauseof() ); contract.registerAggregator("sum_stepwise", ac3, GraphAggregatorType.STEPWISE); // Complete the contract. contract.complete(); } /** * Global aggregator Vertex. This vertex has a list of strings * that are used to stage the results that will be emitted at the end. */ @SuppressWarnings("serial") static class AggExampleVertex extends Vertex { ListString outputStrings = new ArrayListString(); AggExampleVertex(VertexKey vertexKey) { super(vertexKey); } } public void initializeVertex(GraphGlobals globals, VertexState vertexState,MultipleInputs inputs) { // Get the partition definition. PartitionDefinition partDef = inputs.getPartitionDefinition(); // Add the vertex. VertexKey vertexKey = new VertexKey(partDef); AggExampleVertex vertex = new AggExampleVertex(vertexKey); vertexState.addVertex(vertex); globals.updateAggregator("max", partDef); // Every vertex adds -1 to the sums, to demonstrate reset. RowHolder aggInput = new RowHolder(vertexKeySchema_); aggInput.setIntAt(0, -1); globals.updateAggregator("sum_continuous", aggInput); globals.updateAggregator("sum_stepwise", aggInput); String when = "initialization"; addOutputs(vertex.outputStrings, when, vertex, globals); return; } public void operateOnVertex(GraphGlobals globals, VertexState vertexState, VertexMessageIterator inputMessages, VertexMessageEmitter outputMessages, RowEmitter finalRows ) { AggExampleVertex vertex = (AggExampleVertex) vertexState.getVertex(); // Update both sum aggregates. In each case, add the vertex key twice. Also add the // iteration number once if this is the max vertex. RowHolder aggInput = new RowHolder(vertexKeySchema_); aggInput.copyFromRow(vertex.getVertexKey()); globals.updateAggregator("sum_continuous", aggInput); globals.updateAggregator("sum_continuous", aggInput); globals.updateAggregator("sum_stepwise", aggInput); globals.updateAggregator("sum_stepwise", aggInput); if (vertex.getVertexKey().getIntAt(0) == globals.getAggregatorValue("max").toInt()) { aggInput.setIntAt(0, globals.getIteration()); globals.updateAggregator("sum_continuous", aggInput); globals.updateAggregator("sum_stepwise", aggInput); } // Emit the results showing the aggregate values during this iteration. String when = "iteration " + globals.getIteration(); addOutputs(vertex.outputStrings, when, vertex, globals); // After three iterations, exit. if (globals.getIteration() >= 2) globals.globalHalt(); } public void undeliverableMessagesHandler(GraphGlobals globals, VertexMessageIterator undeliverableMessages) { // Ignore. return; } public void emitFinalRows(GraphGlobals globals, VertexState vertexState, RowEmitter finalRows) { AggExampleVertex vertex = (AggExampleVertex) vertexState.getVertex(); // Show aggregate values during the emit phase as well. String when = "the emit phase"; addOutputs(vertex.outputStrings, when, vertex, globals); for (String message : vertex.outputStrings) { finalRows.addString(message); finalRows.emitRow(); } } /* * Generate output results showing the values of the aggregates. */ private void addOutputs(ListString list, String when, Vertex vertex, GraphGlobals globals) { String[] aggNames = new String[] { "max", "sum_continuous", "sum_stepwise" }; for (String aggName : aggNames) { ValueHolder holder = globals.getAggregatorValue(aggName); String value = holder.isNull() ? "NULL" : holder.toString(); String message = "Value of " + aggName + " at " + when + ", on vertex " + vertex.getVertexKey().getIntAt(0) + ": " + value; list.add(message); } } }