Implementation - Aster Execution Engine

Teradata Aster® Developer Guide

Product
Aster Execution Engine
Release Number
7.00.02
Published
July 2017
Language
English (United States)
Last Update
2018-04-13
dita:mapPath
xnl1494366523182.ditamap
dita:ditavalPath
Generic_no_ie_no_tempfilter.ditaval
dita:id
ffu1489104705746
lifecycle
previous
Product Category
Software
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;

import com.asterdata.ncluster.sqlmr.ApiVersion;
import com.asterdata.ncluster.sqlmr.ArgumentClause;
import com.asterdata.ncluster.sqlmr.ClientVisibleException;
import com.asterdata.ncluster.sqlmr.HelpInfo;
import com.asterdata.ncluster.sqlmr.InputInfo;
import com.asterdata.ncluster.sqlmr.OutputInfo;
import com.asterdata.ncluster.sqlmr.util.AsterVersion;

import com.asterdata.ncluster.sqlmr.data.ColumnDefinition;
import com.asterdata.ncluster.sqlmr.data.MultipleInputs;
import com.asterdata.ncluster.sqlmr.data.RowEmitter;
import com.asterdata.ncluster.sqlmr.data.RowIterator;
import com.asterdata.ncluster.sqlmr.data.SqlType;
import com.asterdata.ncluster.sqlmr.data.RowHolder;
import com.asterdata.ncluster.sqlmr.data.RowView;
import com.asterdata.ncluster.sqlmr.data.ValueHolder;
import com.asterdata.ncluster.sqlmr.data.ValueView;
import com.asterdata.ncluster.sqlmr.data.ValueViewComparator;

import com.asterdata.ncluster.graph.data.Edge;
import com.asterdata.ncluster.graph.data.EdgeIterator;
import com.asterdata.ncluster.graph.data.GraphGlobals;
import com.asterdata.ncluster.graph.data.Vertex;
import com.asterdata.ncluster.graph.data.VertexKey;
import com.asterdata.ncluster.graph.data.VertexMessageEmitter;
import com.asterdata.ncluster.graph.data.VertexMessageIterator;
import com.asterdata.ncluster.graph.data.VertexState;
import com.asterdata.ncluster.graph.GraphFunction;
import com.asterdata.ncluster.graph.GraphRuntimeContract;

import com.asterdata.ncluster.util.ImmutableList;

@HelpInfo(usageSyntax = "SimpleSampleSearch("
         + "on verticesTable as vertices_alias partition by vertex_id...  "
         + "on edgesTable as edges_alias partition by src_id...  "
         + "start_vertex(integer), end_vertex(integer)) ",
         shortDescription = "Searches for the path with the fewest edges "
         + "between a specified start vertex and end vertex.",
         longDescription = "SimpleSampleSearch Algorithm: "
         + "SimpleSampleSearch searches for the path with the fewest edges "
         + "between a specified start vertex and end vertex.  "
         + "This function has two arguments -"
         + "1.'start_vertex()' - vertex key of the vertex to start from."
         + "2.'end_vertex()' - the vertex key of the vertex we want to reach.",
         inputColumns = "For the vertices_alias, the columns must include:\n"
         + "  vertex_ID INTEGER, -- a unique identifier.\n"
         + "  cityName CHAR(30)\n"
         + "For the edges_alias, the columns must include:\n"
         + "  edge_ID INTEGER,   -- Not used for much except DISTRIBUTE BY.\n"
         + "  src_ID DOUBLE,     -- Vertex ID of the source vertex.\n"
         + "  dest_id DOUBLE,    -- Vertex ID of the target vertex.\n"
         + "  distance DOUBLE,   -- Distance from the start vertex to the end"
         + "     vertex (km).\n"
         + "  weight DOUBLE      -- Edge's 'weight' (influence or capacity)\n"
         + "  sampling_weight DOUBLE\n",
         outputColumns = "(Distance double precision)",

public class SimpleSampleSearch implements GraphFunction {

   // Cache the vertex IDs of the start and end vertices from the input
   // arguments in the SQL statement.
   private VertexKey startVertexKey_ = null;
   private VertexKey endVertexKey_ = null;
   // Cache the schema of the vertex key.
   private ImmutableListSqlType vertexKeySchema_ = null;

   // Used when comparing vertex keys.
   private VertexKeyComparator vkComparator_ = null;

   // For inputs from the vertices table, these indicate which column holds
   // which info.
   private static final int VTX__ID_COL_ = 0;
   private static final int VTX__CITYNAME_COL_ = VTX__ID_COL_ + 1;

   // Each input row from the "edges" table contains the vertex IDs
   // of the starting and ending vertices of this edge.
   // These column numbers allow us to index into an input row to get data
   // from a specific column, such as the column with vertex ID (vertex key)
   // of the "source" vertex.
   private static final int EDGE__ID_COL_ = 0;
   private static final int EDGE__SRC_ID_COL_ = EDGE__ID_COL_ + 1;
   private static final int EDGE__DEST_ID_COL_ = EDGE__SRC_ID_COL_ + 1;
   private static final int EDGE__DISTANCE_COL_ = EDGE__DEST_ID_COL_ + 1;
   private static final int EDGE__WEIGHT_COL_ = EDGE__DISTANCE_COL_ + 1;
   private static final int EDGE__SAMPLING_WEIGHT_COL_ = EDGE__WEIGHT_COL_ + 1;

   // These are the column indexes for the messages.
   // The distance up to and including the current (receiving) vertex.
   private static final int MSG__DISTANCE_COL = 0;

   /**
   * Fill in the runtime contract with information about the input table
   * schema, the output table schema, the message schema, and any arguments
   * specified, which in this case are the Vertex IDs of the vertex that we
   * start searching from and the vertex that we are searching for (the start
   * and end vertices).
   *
   * @param contract is a GraphRuntimeContract runtime contract structure,
   *     which we will update with info about the input table schema, etc.
   */
public SimpleSampleSearch(GraphRuntimeContract contract) {

   // Get the vertex key schema, which is the same as
   // the partition definition schema, which in turn is
   // based on the column that holds the start vertex's vertex key.
   // The "vertices_alias" is the alias of the vertices table that we
   // specified in the graph function call in the SQL statement.
   InputInfo inputInfo = contract.getInputInfo("vertices_alias");
   vertexKeySchema_ = inputInfo.getRowPartitioningExpressionTypes();

   // Set the start and end vertex key values based on the input arguments.
   startVertexKey_ = getVertexKeyFromArgument(contract, "start_vertex");
   endVertexKey_ = getVertexKeyFromArgument(contract, "end_vertex");

   // Create an object that can be used to compare vertex keys.
   vkComparator_ = new VertexKeyComparator(vertexKeySchema_);

   // Set the vertex message schema. Messages sent to incident vertices
   // contain the cumulative distance so far along the path from Start to
   // End, so the message schema contains a single column that holds
   // a SQL FLOAT (Java "double") value.
   ArrayListSqlType vertexMessageSchema = null;
   vertexMessageSchema = new ArrayListSqlType();
   vertexMessageSchema.add(0, SqlType.doublePrecision());
   contract.setVertexMessageSchema(
    ImmutableList.elementsOf(vertexMessageSchema));

   // Set the schema of the output table, which has 1 column: a DOUBLE
   // that holds the cumulative distance from the starting vertex.
   ColumnDefinition distanceColDef =
      new ColumnDefinition("distance", SqlType.doublePrecision());
   ArrayListColumnDefinition outputColumns =
      new ArrayListColumnDefinition();
   outputColumns.add(distanceColDef);
   contract.setOutputInfo(new OutputInfo(outputColumns));

   // Complete the graph contract and cache the completed contract.
   contract.complete();

}// SimpleSampleSearch constructor

/**
* Given the name of an argument that holds a vertex key, return the value
* of that input argument as a VertexKey. Ideally, this code would be
* generic and return any vertex key (regardless of the number and
* data types of that vertex key), but this code is not that generic; this
* code assumes that the vertex key consists of one column of type "double".
*
* @param contract is a GraphRuntimeContract runtime contract structure,
*     which we will update with info about the input table schema, etc.
*/
private VertexKey getVertexKeyFromArgument(GraphRuntimeContract contract,
      String argumentName)
{

   /*
   * Read the vertex key value from the input arguments.
   * This is done by building a row (RowHolder) for the argument (e.g.
   * start_vertex or end_vertex) and using the RowHolder as an
   * input to the VertexKey class constructor.
   * (In this case, the "row" has only one column, because one column,
   * the src_ID, is sufficient to uniquely identify a vertex.)
   */
   ArgumentClause vertexArg = contract.useArgumentClause(argumentName);
   ImmutableListString vertexArgValues = vertexArg.getValues();
   IteratorString vertexArgValuesIterator = vertexArgValues.iterator();
   // Get the first column of the vertex key (assumes there's only 1 col!).
   String vertexIdAsString = vertexArgValuesIterator.next();
   // Get the vertex ID from that column.
   Double vertexId = Double.parseDouble(vertexIdAsString);
   // Construct a vertex key based on this vertex ID.
   RowHolder vertexKeyValues = new RowHolder(vertexKeySchema_);
   vertexKeyValues.setDoubleAt(0, vertexId);
   VertexKey vkey = new VertexKey(vertexKeyValues);
   return vkey;
}

/**
* The initializeVertex method is invoked once per input partition; at each
* invocation, the function returns the processing state (VertexState) of a
* single vertex whose vertex key (VertexKey) is constructed using the
* unique src_ID value of the current partition.
* Each individual row of the current partition is used to
* construct an incident edge whose destination vertex key is
* constructed using the destination value of the given row.
*
* @param globals is a {@linkplain GraphGlobals} providing access to common
*     graph global state, including the current iteration number.
* @param vtxState is a {@linkplain VertexState} that encapsulates the
*     processing state of a vertex.
* @param inputs is a {@linkplain MultipleInput} providing access to a
*     current partition of input rows for a given combination of sid0 and
*      sid1 values.
*/
public void initializeVertex(GraphGlobals globals, VertexState vtxState,
         MultipleInputs inputs) {

   // A standard prefix for any error messages we need to log.
   String errMsg = "SimpleSampleSearchinitializeVertex(): ";

   // The SQL query is partitioned on the vertex key, so now we can
   // get the vertex key for the vertex that we are about to create.
   RowHolder vertexKeyValues = new RowHolder(vertexKeySchema_);
   vertexKeyValues.copyFromRow((RowView) inputs.getPartitionDefinition());
   VertexKey vertexKey = new VertexKey(vertexKeyValues);

   // Get the row corresponding to the vertex table input (information
   // about the current vertex that we are initializing).  "vertices_alias"
   // is the alias of the vertices table specified in the SELECT statement.
   // We assume that the iterator returns exactly one row and that the row
   // does not contain any NULLs.
   RowIterator vertexRowIterator = inputs.getRowIterator("vertices_alias");
   boolean hasRow = vertexRowIterator.advanceToNextRow();
   if (!hasRow)  {
      String msg = errMsg + "vertex iterator was empty. Possible causes " +
         "include an edge whose src_id (ID of the starting vertex) " +
         "does not match the vertex ID of any vertex.";
      throw new ClientVisibleException(msg);
   }
   String cityName = vertexRowIterator.getStringAt(VTX__CITYNAME_COL_);
   // Create a new vertex based on the information in this row.
   TransitVertex transitVertex = new TransitVertex(vertexKey, cityName);
   // "Add" this vertex to the vertex state. (Unintuitively, the vertex is
   // stored "in" the vertex state, not the other way around.)
   vtxState.addVertex(transitVertex);

   /*
   * Create a list of incident edges. Column EDGE__DEST_ID_COL_ holds the
   * vertex key value of a neighboring destination vertex.
   */
   RowIterator edgeTableIterator = inputs.getRowIterator("edges_alias");
   RowHolder destVertexKeyValue = null;
   double destVertexKeyAsDouble = (double) 0.0;
   VertexKey destVertexKey = null;
   // This will hold the edges (one at a time) of the vertex we create.
   TransitEdge transitEdge = null;
   // True if this vertex has an outgoing edge that we haven't processed yet.
   boolean hasAnotherEdge = edgeTableIterator.advanceToNextRow();
   double distance = 0.0;
   double samplingWeight = 0.0;
   double weight = 0.0;
   while (hasAnotherEdge) {
      // Create the destination vertex key.
      destVertexKeyValue = new RowHolder(vertexKeySchema_);
      destVertexKeyAsDouble =
         edgeTableIterator.getDoubleAt(EDGE__DEST_ID_COL_);
      destVertexKeyValue.setDoubleAt(0, destVertexKeyAsDouble);
      destVertexKey = new VertexKey(destVertexKeyValue);
      if (edgeTableIterator.isNullAt(EDGE__SAMPLING_WEIGHT_COL_)) {
         String msg = errMsg +
            "edgeTableIterator: EDGE__SAMPLING_WEIGHT_COL_: NULL: ";
         throw new ClientVisibleException(msg);
      }
      samplingWeight =
         edgeTableIterator.getDoubleAt(EDGE__SAMPLING_WEIGHT_COL_);
      if (edgeTableIterator.isNullAt(EDGE__DISTANCE_COL_))  {
         String msg = errMsg +
            "edgeTableIterator: EDGE__DISTANCE_COL_: NULL: ";
         throw new ClientVisibleException(msg);
      }
      distance = edgeTableIterator.getDoubleAt(EDGE__DISTANCE_COL_);
      // Construct an Edge: use the vertex key of the destination vertex
      // and add the edge to the Vertex object (actually, to the
      // vertex's VertexState).
      transitEdge = new TransitEdge(destVertexKey, samplingWeight, distance);
      if (edgeTableIterator.isNullAt(EDGE__WEIGHT_COL_)) {
         String msg = errMsg +
            "edgeTableIterator: EDGE__WEIGHT_COL_: NULL: ";
         throw new ClientVisibleException(msg);
      }
      weight = edgeTableIterator.getDoubleAt(EDGE__WEIGHT_COL_);
      transitEdge.setWeight(weight);
      vtxState.addEdge(transitEdge);

      hasAnotherEdge = edgeTableIterator.advanceToNextRow();

   }// for each row corresponding to an edge

}// initializeVertex

/**
* This sends a message to a vertex; the message includes the distance
* (in km) up to and including the vertex that receives the message.
* @param distanceSoFar the distance up to, and including, the vertex that
*     receives the message.
* @param vertexState the vertexState of the vertex sending the message.
* @param outputMessages an emitter for constructing and sending messages
*     to other vertices.
*/
private void sendMessages(double distanceSoFar,
 VertexState vertexState, VertexMessageEmitter outputMessages)
{
   // The vertex key of the vertex that we will send the message to.
   VertexKey targetVertexKey = null;
   // The distance from the sending vertex to the receiving vertex.
   double distanceToNextVertex = (double) 0.0;
   // The total distance to the next vertex (the distance so far
   // plus the distance along the edge to the next vertex).
   double totalDistance = (double) 0.0;
   // The edge that we'll send the message along.
   TransitEdge currentEdge = null;

   // For each outgoing edge of this vertex, send a message...
   EdgeIterator edgeIterator = vertexState.getEdgeIterator();
   String msg = "";
   while (edgeIterator.advanceToNextEdge()) {
      /*
      * Grab the next incident edge. Form the vertex key for the
      * destination vertex, build the message payload, and emit the
      * message.
      */
      currentEdge = (TransitEdge) edgeIterator.getEdge();
      targetVertexKey = currentEdge.getTargetVertexKey();
      distanceToNextVertex = currentEdge.getDistance();
      totalDistance = distanceSoFar + distanceToNextVertex;
      outputMessages.addDouble(totalDistance);
      outputMessages.emitVertexMessage(targetVertexKey);
   }
   edgeIterator.close();
}

/**
* The operateOnVertex method propagates messages sent from the search's
* starting vertex until we find the ending vertex.
*
* The start vertex is the only vertex to send messages during
* iteration 0. After that, each vertex propagates received messages
* to its "downstream" vertices along its incident edges.
*
* Once a vertex propagates vertex messages, its job is done and it
* calls "deactivate()". The algorithm stops if either the destination
* vertex gets a message (issues a "global halt") or no vertex sends any
* messages.
*
* To avoid getting into an infinite loop if there is a loop in the graph,
* each vertex checks the distance in each incoming message, and it
* sends an outgoing message only if the newest distance is shorter than
* any previous distance.  (This also reduces the number of messages.)
*
* To help make this clear, think about the difference between
* finding the shortest path in terms of edges vs. the shortest path in
* terms of kilometers.  If we want to find the fewest edges, then we
* can call localHalt() during the first iteration that a message reaches
* this vertex; any message in a subsequent iteration that reaches this
* vertex is guaranteed to have taken more edges to get here (one edge per
* iteration), AND won't influence the search in a way that reduces the
* number of edges in the "downstream" portion of the path.  So the path is
* guaranteed to be longer (in edges).
* Contrast that with the algorithm for finding the smallest number of
* kilometers.  A path that required more edges but fewer kilometers
* (averages fewer kilometers per edge) is NOT something that we want to
* ignore, so we deactivate rather than localHalt().
*
* In this particular test/demo, we have a nested loop:
*     for each incoming message:
*        for each outgoing edge:
*            send message
* If there are 2 incoming messages and 3 outgoing edges, the program potentially sends
* 6 outgoing messages, but only 3 of those actually have the shortest
* distance in them.  (The number that the program sends depends upon whether
* the incoming message I'm processing now has a shorter distance than any
* previous incoming message. If it's shorter, the code sends a new set of outgoing
* messages. If it's longer, nothing is sent.) It would be more
* efficient to code this differently; instead of nesting the "per outgoing
* edge" loop inside the "per incoming message" loop, the code could read ALL of
* the incoming messages first, select the shortest distance, and then send
* only 1 round of outgoing messages.
*     for each incoming message:
*        if distance is shorter than previous, then keep new shortest dist.
*     for each outgoing edge:
*        send message
* @param globals  is a GraphGlobals with graph global state
* @param vertexState is the vertex state information about the vertex we
*     will operate on.
* @param inputMessages is an iterator of input vertex messages.
* @param outputMessages is an emitter for building and sending messages to
*     other vertices.
* @param finalRows an emitter for emitting output rows.
*/
public void operateOnVertex(GraphGlobals globals, VertexState vertexState,
         VertexMessageIterator inputMessages,
         VertexMessageEmitter outputMessages,
         RowEmitter finalRows) {

   // Get this vertex and its vertex key.
   TransitVertex vertex = (TransitVertex) vertexState.getVertex();
   VertexKey vtxKey = vertex.getVertexKey();

   // If this is iteration 0 ...
   if (globals.getIteration() == 0) {
      // If this vertex is the "start" vertex
      if (vkComparator_.compare(vtxKey, startVertexKey_) == 0) {
         // If this vertex is also the end vertex...
         if (vkComparator_.compare(vtxKey, endVertexKey_) == 0) {
            vertex.setDistanceFromSource( (double) 0.0);
            System.out.println("globalHalt 1");
            globals.globalHalt();
            return;
         }
         // If this vertex is the "start" vertex but not the end vertex...
         else  {
            // For each outgoing edge of this vertex, send a message...
            sendMessages((double) 0.0, vertexState, outputMessages);
         }
      }
      // After the 0th iteration, only the vertices that get messages will
      // need to do anything, so we'll deactivate each vertex until it gets
      // a message.  This does not change the output, but it reduces query
      // time enormously when the search requires many iterations.
      vertexState.deactivate();
      return;
   }
   // If it's not iteration 0, and there is at least 1 incoming message...
   else if (inputMessages.advanceToNextMessage()) {
      // The distance so far (in kilometers) from the start vertex to this
      // vertex.
      double distanceSoFar;

      // For each incoming message, if it has a shorter distance than any
      // previous message to this vertex, then remember that distance.
      do {
         // Get the distance in the current message.
         distanceSoFar = inputMessages.getDoubleAt(MSG__DISTANCE_COL);
         // If this is shorter than any previous distance we received...
         if (distanceSoFar < vertex.getDistanceFromSource()) {
            // Store this new shortest distance
            vertex.setDistanceFromSource(distanceSoFar);
         }
      }
      while (inputMessages.advanceToNextMessage());

         // If this vertex is the end vertex...
         if (vkComparator_.compare(vtxKey, endVertexKey_) == 0)  {
            System.out.println("globalHalt 2");
            globals.globalHalt();
            return;
         }
         // If this isn't the end vertex that we're searching for, then...
         else {
            // ...for each outgoing edge of this vertex, send a message...
            System.out.println("DDDIAGNOSTIC: operateOnVertex(): send");
            // If this is the shortest distance we received so far...
            if (distanceSoFar <= vertex.getDistanceFromSource()) {
               sendMessages(vertex.getDistanceFromSource(), vertexState,
                outputMessages);
            }
            // Now that this vertex has responded to each message, the vertex
            // deactivates itself until/unless it gets another message.
            System.out.println("deactivate");
            vertexState.deactivate();
         }

      }// not iteration 0, and there are incoming messages

   }// operateOnVertex

   /**
   * The emitFinalRows method is called once per vertex to determine if the
   * path of interest was found. That will be the case if the ending
   * vertex has a "shortestDistance" that is different from the starting
   * value, in which case that vertex will output a single row with the
   * distance. No rows are emitted otherwise.
   *
   * @param globals  is a GraphGlobals with graph global state
   * @param vertexState  is a Vertex with local vertex state
   * @param emitter  is a RowEmitter for emitting final results rows
   */
   public void emitFinalRows(GraphGlobals globals, VertexState vertexState,
            RowEmitter emitter) {

      TransitVertex vertex = (TransitVertex) vertexState.getVertex();
      VertexKey vertexKey = vertex.getVertexKey();
      // If this is the vertex we're searching for...
      if (vkComparator_.compare(vertexKey, this.endVertexKey_) == 0) {
         if (vertex.getDistanceFromSource() < TransitVertex.DEFAULT_DISTANCE_) {
            emitter.addDouble(vertex.getDistanceFromSource());
            emitter.emitRow();
         }
      }
      return;
   }// emitFinalRows

   /**
   * Ignore undeliverable messages. These may be seen if a message is sent
   * to a target which is not an initialized vertex.
   *
   * @param globals  provides the iteration number and other global state
   * @param undeliverableMessages  the messages that could not be delivered
   */
   public void undeliverableMessagesHandler(GraphGlobals globals,
    VertexMessageIterator undeliverableMessages) {
      while (undeliverableMessages.advanceToNextMessage()) {
         System.out.println("ERROR: undeliverable message: distance = " +
         undeliverableMessages.getDoubleAt(MSG__DISTANCE_COL));
      }
      return;
   }

   class VertexKeyComparator implements ComparatorVertexKey{
      ArrayListComparatorValueView valueComparators_ = null;
      ArrayListValueHolder lValues = null;
      ArrayListValueHolder rValues = null;

      VertexKeyComparator(ListSqlType vkSchema) {
         valueComparators_ = new ArrayListComparatorValueView();
         lValues = new ArrayListValueHolder();
         rValues = new ArrayListValueHolder();
         SqlType type = null;
         for (int i = 0; i < vkSchema.size(); ++i) {
            type = vkSchema.get(i);
            valueComparators_.add(
            ValueViewComparator.getComparator(type));
            lValues.add(new ValueHolder(type));
            rValues.add(new ValueHolder(type));
         }
      }
      @Override
      public synchronized int compare(VertexKey l, VertexKey r) {
         assert valueComparators_.size() == l.getColumnCount();
         assert valueComparators_.size() == r.getColumnCount();
         ValueHolder lValue = null;
         ValueHolder rValue = null;
         int ret = 0;
         for (int i = 0 ; i < valueComparators_.size(); ++i) {
            lValue = lValues.get(i);
            rValue = rValues.get(i);
            // getValueAt() will copy the i'th value from the VertexKey to the
            //   ValueHolder (lValue or rValue below).
            l.getValueAt(i, lValue);
            r.getValueAt(i, rValue);
            if (lValue.isNull()) {
               if (rValue.isNull()) {
                  continue;
               }
               return -1;
            } else {
               if (rValue.isNull()) {
                  return 1;
               }
            }
            ret = valueComparators_.get(i).compare(lValue,rValue);
            if (0 != ret) {
               return ret;
            }
         }
      return 0;
      }
   }
}// SimpleSampleSearch