global_agg_example.java - Aster Execution Engine

Teradata Aster® Developer Guide

Product
Aster Execution Engine
Release Number
7.00.02
Published
July 2017
Language
English (United States)
Last Update
2018-04-13
dita:mapPath
xnl1494366523182.ditamap
dita:ditavalPath
Generic_no_ie_no_tempfilter.ditaval
dita:id
ffu1489104705746
lifecycle
previous
Product Category
Software
import com.asterdata.ncluster.sqlmr.ApiVersion;
import com.asterdata.ncluster.sqlmr.ArgumentClause;
import com.asterdata.ncluster.sqlmr.InputInfo;
import com.asterdata.ncluster.sqlmr.OutputInfo;
import com.asterdata.ncluster.sqlmr.HelpInfo;
import com.asterdata.ncluster.sqlmr.util.AsterVersion;
import com.asterdata.ncluster.sqlmr.data.ColumnDefinition;
import com.asterdata.ncluster.sqlmr.data.MultipleInputs;
import com.asterdata.ncluster.sqlmr.data.PartitionDefinition;
import com.asterdata.ncluster.sqlmr.data.RowEmitter;
import com.asterdata.ncluster.sqlmr.data.RowHolder;
import com.asterdata.ncluster.sqlmr.data.SqlType;
import com.asterdata.ncluster.sqlmr.data.ValueHolder;
import com.asterdata.ncluster.graph.data.VertexKey;
import com.asterdata.ncluster.graph.data.VertexMessageEmitter;
import com.asterdata.ncluster.graph.data.VertexMessageIterator;
import com.asterdata.ncluster.graph.data.VertexState;
import com.asterdata.ncluster.graph.data.Vertex;
import com.asterdata.ncluster.graph.GraphAggregatorType;
import com.asterdata.ncluster.graph.GraphFunction;
import com.asterdata.ncluster.graph.GraphRuntimeContract;
import com.asterdata.ncluster.graph.data.GraphGlobals;
import com.asterdata.ncluster.util.ImmutableList;

import java.util.ArrayList;
import java.util.List;

@HelpInfo(
   usageSyntax = "",
   shortDescription = "Graph function for demonstrating basic aggregator  functionality.",
   longDescription = "",
   inputColumns = "",
   outputColumns = "",
   author = "",
)

/**
* This graph function shows how you can use the global aggregator API.
*
* There are three aggregates: max, sum (continuous) and sum (stepwise).
*
* The function updates the max aggregate only during initialization
* with all of the vertex keys. This way, after initialization,
* the aggregate should have the maximum vertex key.
*
* The two sum aggregates receive the same updates. During initialization, all the
* vertices add -1 to both sums. During operateOnVertex, each vertex adds its
* key value to the sum, twice. Also, the max vertex adds the iteration number
* to the sum.
*
* For example, with three vertices (1, 2, 3), the stepwise sum would be:
*
* After initialization: -1 * 3 = -3
* After iteration 0: 2*(1+2+3) + 0 = 12
* After iteration 1: 2*(1+2+3) + 1 = 13
* After iteration 2: 2*(1+2+3) + 2 = 14
*
* The continuous sum aggregate is just the sum of all the stepwise values
* up to that point.
*/
public final class global_agg_example implements GraphFunction
{
   ListSqlType vertexKeySchema_;

   public global_agg_example(GraphRuntimeContract contract) {
      // Set the vertex message schema to one integer column (dummy).
      ArrayListSqlType vertexMessageSchema =
         new ArrayListSqlType();
      vertexMessageSchema.add(0, SqlType.integer());
      contract.setVertexMessageSchema(
         ImmutableList.elementsOf(vertexMessageSchema));

      // Set the vertex key schema to one integer column
      vertexKeySchema_ = new ArrayListSqlType();
      vertexKeySchema_.add(0, SqlType.integer());

      // Set the output schema to (result varchar).
      ArrayListColumnDefinition outputColumns =
         new ArrayListColumnDefinition();
      outputColumns.add(new ColumnDefinition("result", SqlType.varchar()));
      contract.setOutputInfo(new OutputInfo(outputColumns));

      // Input schema for all aggregators is integer.
      ArrayListColumnDefinition inputCols =
         new ArrayListColumnDefinition();
      inputCols.add(new ColumnDefinition("input1", SqlType.integer()));
      InputInfo aggInput = new InputInfo(new InputInfo.Builder(inputCols));

      // Register max aggregator (continuous).
      GraphRuntimeContract.AggregatorInfo ac =
         GraphRuntimeContract.AggregatorInfo.getBundledAggregator(
            "int_max",
            aggInput,
            ImmutableList.ArgumentClauseof()
            );
      contract.registerAggregator("max", ac,
         GraphAggregatorType.CONTINUOUS);

      // Register aggregator for continuous sum.
      GraphRuntimeContract.AggregatorInfo ac2 =
         GraphRuntimeContract.AggregatorInfo.getBundledAggregator(
            "int_sum",
            aggInput,
            ImmutableList.ArgumentClauseof()
            );
      contract.registerAggregator("sum_continuous", ac2,
         GraphAggregatorType.CONTINUOUS);

      // Register aggregator for stepwise sum.
      GraphRuntimeContract.AggregatorInfo ac3 =
         GraphRuntimeContract.AggregatorInfo.getBundledAggregator(
            "int_sum",
            aggInput,
            ImmutableList.ArgumentClauseof()
            );
      contract.registerAggregator("sum_stepwise", ac3,
         GraphAggregatorType.STEPWISE);

      // Complete the contract.
      contract.complete();
   }

   /**
   * Global aggregator Vertex. This vertex has a list of strings
   * that are used to stage the results that will be emitted at the end.
   */
   @SuppressWarnings("serial")
   static class AggExampleVertex extends Vertex {
      ListString outputStrings =
         new ArrayListString();
      AggExampleVertex(VertexKey vertexKey) {
         super(vertexKey);
      }
   }

   public void initializeVertex(GraphGlobals globals, VertexState vertexState,MultipleInputs inputs) {
      // Get the partition definition.
      PartitionDefinition partDef = inputs.getPartitionDefinition();

      // Add the vertex.
      VertexKey vertexKey = new VertexKey(partDef);
      AggExampleVertex vertex = new AggExampleVertex(vertexKey);
      vertexState.addVertex(vertex);

      globals.updateAggregator("max", partDef);

      // Every vertex adds -1 to the sums, to demonstrate reset.
      RowHolder aggInput = new RowHolder(vertexKeySchema_);
      aggInput.setIntAt(0, -1);
      globals.updateAggregator("sum_continuous", aggInput);
      globals.updateAggregator("sum_stepwise", aggInput);

      String when = "initialization";
      addOutputs(vertex.outputStrings, when, vertex, globals);

      return;
   }

   public void operateOnVertex(GraphGlobals globals,
               VertexState vertexState,
               VertexMessageIterator inputMessages,
               VertexMessageEmitter outputMessages,
               RowEmitter finalRows
               ) {
      AggExampleVertex vertex = (AggExampleVertex) vertexState.getVertex();

      // Update both sum aggregates. In each case, add the vertex key twice. Also add the
      // iteration number once if this is the max vertex.
      RowHolder aggInput = new RowHolder(vertexKeySchema_);

      aggInput.copyFromRow(vertex.getVertexKey());
      globals.updateAggregator("sum_continuous", aggInput);
      globals.updateAggregator("sum_continuous", aggInput);
      globals.updateAggregator("sum_stepwise", aggInput);
      globals.updateAggregator("sum_stepwise", aggInput);
      if (vertex.getVertexKey().getIntAt(0) ==
          globals.getAggregatorValue("max").toInt()) {
         aggInput.setIntAt(0, globals.getIteration());
         globals.updateAggregator("sum_continuous", aggInput);
         globals.updateAggregator("sum_stepwise", aggInput);
      }

      // Emit the results showing the aggregate values during this iteration.
      String when = "iteration " + globals.getIteration();
      addOutputs(vertex.outputStrings, when, vertex, globals);

      // After three iterations, exit.
      if (globals.getIteration() >= 2)
         globals.globalHalt();
   }

   public void undeliverableMessagesHandler(GraphGlobals globals,
               VertexMessageIterator undeliverableMessages) {
      // Ignore.
      return;
   }

   public void emitFinalRows(GraphGlobals globals, VertexState vertexState,
               RowEmitter finalRows) {
      AggExampleVertex vertex = (AggExampleVertex) vertexState.getVertex();

      // Show aggregate values during the emit phase as well.
      String when = "the emit phase";
      addOutputs(vertex.outputStrings, when, vertex, globals);

      for (String message : vertex.outputStrings) {
         finalRows.addString(message);
         finalRows.emitRow();
      }
   }

   /*
   * Generate output results showing the values of the aggregates.
   */
   private void addOutputs(ListString list,
                           String when,
                           Vertex vertex,
                           GraphGlobals globals) {
      String[] aggNames = new String[] {
         "max", "sum_continuous", "sum_stepwise"
      };

      for (String aggName : aggNames) {
         ValueHolder holder =
            globals.getAggregatorValue(aggName);
         String value =
            holder.isNull() ? "NULL" : holder.toString();
         String message =
            "Value of " + aggName +
            " at " + when +
            ", on vertex " + vertex.getVertexKey().getIntAt(0) +
            ": " + value;
         list.add(message);
      }
   }
}