import java.util.ArrayList; import java.util.Comparator; import java.util.Iterator; import java.util.List; import com.asterdata.ncluster.sqlmr.ApiVersion; import com.asterdata.ncluster.sqlmr.ArgumentClause; import com.asterdata.ncluster.sqlmr.ClientVisibleException; import com.asterdata.ncluster.sqlmr.HelpInfo; import com.asterdata.ncluster.sqlmr.InputInfo; import com.asterdata.ncluster.sqlmr.OutputInfo; import com.asterdata.ncluster.sqlmr.util.AsterVersion; import com.asterdata.ncluster.sqlmr.data.ColumnDefinition; import com.asterdata.ncluster.sqlmr.data.MultipleInputs; import com.asterdata.ncluster.sqlmr.data.RowEmitter; import com.asterdata.ncluster.sqlmr.data.RowIterator; import com.asterdata.ncluster.sqlmr.data.SqlType; import com.asterdata.ncluster.sqlmr.data.RowHolder; import com.asterdata.ncluster.sqlmr.data.RowView; import com.asterdata.ncluster.sqlmr.data.ValueHolder; import com.asterdata.ncluster.sqlmr.data.ValueView; import com.asterdata.ncluster.sqlmr.data.ValueViewComparator; import com.asterdata.ncluster.graph.data.Edge; import com.asterdata.ncluster.graph.data.EdgeIterator; import com.asterdata.ncluster.graph.data.GraphGlobals; import com.asterdata.ncluster.graph.data.Vertex; import com.asterdata.ncluster.graph.data.VertexKey; import com.asterdata.ncluster.graph.data.VertexMessageEmitter; import com.asterdata.ncluster.graph.data.VertexMessageIterator; import com.asterdata.ncluster.graph.data.VertexState; import com.asterdata.ncluster.graph.GraphFunction; import com.asterdata.ncluster.graph.GraphRuntimeContract; import com.asterdata.ncluster.util.ImmutableList; @HelpInfo(usageSyntax = "SimpleSampleSearch(" + "on verticesTable as vertices_alias partition by vertex_id... " + "on edgesTable as edges_alias partition by src_id... " + "start_vertex(integer), end_vertex(integer)) ", shortDescription = "Searches for the path with the fewest edges " + "between a specified start vertex and end vertex.", longDescription = "SimpleSampleSearch Algorithm: " + "SimpleSampleSearch searches for the path with the fewest edges " + "between a specified start vertex and end vertex. " + "This function has two arguments -" + "1.'start_vertex()' - vertex key of the vertex to start from." + "2.'end_vertex()' - the vertex key of the vertex we want to reach.", inputColumns = "For the vertices_alias, the columns must include:\n" + " vertex_ID INTEGER, -- a unique identifier.\n" + " cityName CHAR(30)\n" + "For the edges_alias, the columns must include:\n" + " edge_ID INTEGER, -- Not used for much except DISTRIBUTE BY.\n" + " src_ID DOUBLE, -- Vertex ID of the source vertex.\n" + " dest_id DOUBLE, -- Vertex ID of the target vertex.\n" + " distance DOUBLE, -- Distance from the start vertex to the end" + " vertex (km).\n" + " weight DOUBLE -- Edge's 'weight' (influence or capacity)\n" + " sampling_weight DOUBLE\n", outputColumns = "(Distance double precision)", public class SimpleSampleSearch implements GraphFunction { // Cache the vertex IDs of the start and end vertices from the input // arguments in the SQL statement. private VertexKey startVertexKey_ = null; private VertexKey endVertexKey_ = null; // Cache the schema of the vertex key. private ImmutableListSqlType vertexKeySchema_ = null; // Used when comparing vertex keys. private VertexKeyComparator vkComparator_ = null; // For inputs from the vertices table, these indicate which column holds // which info. private static final int VTX__ID_COL_ = 0; private static final int VTX__CITYNAME_COL_ = VTX__ID_COL_ + 1; // Each input row from the "edges" table contains the vertex IDs // of the starting and ending vertices of this edge. // These column numbers allow us to index into an input row to get data // from a specific column, such as the column with vertex ID (vertex key) // of the "source" vertex. private static final int EDGE__ID_COL_ = 0; private static final int EDGE__SRC_ID_COL_ = EDGE__ID_COL_ + 1; private static final int EDGE__DEST_ID_COL_ = EDGE__SRC_ID_COL_ + 1; private static final int EDGE__DISTANCE_COL_ = EDGE__DEST_ID_COL_ + 1; private static final int EDGE__WEIGHT_COL_ = EDGE__DISTANCE_COL_ + 1; private static final int EDGE__SAMPLING_WEIGHT_COL_ = EDGE__WEIGHT_COL_ + 1; // These are the column indexes for the messages. // The distance up to and including the current (receiving) vertex. private static final int MSG__DISTANCE_COL = 0; /** * Fill in the runtime contract with information about the input table * schema, the output table schema, the message schema, and any arguments * specified, which in this case are the Vertex IDs of the vertex that we * start searching from and the vertex that we are searching for (the start * and end vertices). * * @param contract is a GraphRuntimeContract runtime contract structure, * which we will update with info about the input table schema, etc. */ public SimpleSampleSearch(GraphRuntimeContract contract) { // Get the vertex key schema, which is the same as // the partition definition schema, which in turn is // based on the column that holds the start vertex's vertex key. // The "vertices_alias" is the alias of the vertices table that we // specified in the graph function call in the SQL statement. InputInfo inputInfo = contract.getInputInfo("vertices_alias"); vertexKeySchema_ = inputInfo.getRowPartitioningExpressionTypes(); // Set the start and end vertex key values based on the input arguments. startVertexKey_ = getVertexKeyFromArgument(contract, "start_vertex"); endVertexKey_ = getVertexKeyFromArgument(contract, "end_vertex"); // Create an object that can be used to compare vertex keys. vkComparator_ = new VertexKeyComparator(vertexKeySchema_); // Set the vertex message schema. Messages sent to incident vertices // contain the cumulative distance so far along the path from Start to // End, so the message schema contains a single column that holds // a SQL FLOAT (Java "double") value. ArrayListSqlType vertexMessageSchema = null; vertexMessageSchema = new ArrayListSqlType(); vertexMessageSchema.add(0, SqlType.doublePrecision()); contract.setVertexMessageSchema( ImmutableList.elementsOf(vertexMessageSchema)); // Set the schema of the output table, which has 1 column: a DOUBLE // that holds the cumulative distance from the starting vertex. ColumnDefinition distanceColDef = new ColumnDefinition("distance", SqlType.doublePrecision()); ArrayListColumnDefinition outputColumns = new ArrayListColumnDefinition(); outputColumns.add(distanceColDef); contract.setOutputInfo(new OutputInfo(outputColumns)); // Complete the graph contract and cache the completed contract. contract.complete(); }// SimpleSampleSearch constructor /** * Given the name of an argument that holds a vertex key, return the value * of that input argument as a VertexKey. Ideally, this code would be * generic and return any vertex key (regardless of the number and * data types of that vertex key), but this code is not that generic; this * code assumes that the vertex key consists of one column of type "double". * * @param contract is a GraphRuntimeContract runtime contract structure, * which we will update with info about the input table schema, etc. */ private VertexKey getVertexKeyFromArgument(GraphRuntimeContract contract, String argumentName) { /* * Read the vertex key value from the input arguments. * This is done by building a row (RowHolder) for the argument (e.g. * start_vertex or end_vertex) and using the RowHolder as an * input to the VertexKey class constructor. * (In this case, the "row" has only one column, because one column, * the src_ID, is sufficient to uniquely identify a vertex.) */ ArgumentClause vertexArg = contract.useArgumentClause(argumentName); ImmutableListString vertexArgValues = vertexArg.getValues(); IteratorString vertexArgValuesIterator = vertexArgValues.iterator(); // Get the first column of the vertex key (assumes there's only 1 col!). String vertexIdAsString = vertexArgValuesIterator.next(); // Get the vertex ID from that column. Double vertexId = Double.parseDouble(vertexIdAsString); // Construct a vertex key based on this vertex ID. RowHolder vertexKeyValues = new RowHolder(vertexKeySchema_); vertexKeyValues.setDoubleAt(0, vertexId); VertexKey vkey = new VertexKey(vertexKeyValues); return vkey; } /** * The initializeVertex method is invoked once per input partition; at each * invocation, the function returns the processing state (VertexState) of a * single vertex whose vertex key (VertexKey) is constructed using the * unique src_ID value of the current partition. * Each individual row of the current partition is used to * construct an incident edge whose destination vertex key is * constructed using the destination value of the given row. * * @param globals is a {@linkplain GraphGlobals} providing access to common * graph global state, including the current iteration number. * @param vtxState is a {@linkplain VertexState} that encapsulates the * processing state of a vertex. * @param inputs is a {@linkplain MultipleInput} providing access to a * current partition of input rows for a given combination of sid0 and * sid1 values. */ public void initializeVertex(GraphGlobals globals, VertexState vtxState, MultipleInputs inputs) { // A standard prefix for any error messages we need to log. String errMsg = "SimpleSampleSearchinitializeVertex(): "; // The SQL query is partitioned on the vertex key, so now we can // get the vertex key for the vertex that we are about to create. RowHolder vertexKeyValues = new RowHolder(vertexKeySchema_); vertexKeyValues.copyFromRow((RowView) inputs.getPartitionDefinition()); VertexKey vertexKey = new VertexKey(vertexKeyValues); // Get the row corresponding to the vertex table input (information // about the current vertex that we are initializing). "vertices_alias" // is the alias of the vertices table specified in the SELECT statement. // We assume that the iterator returns exactly one row and that the row // does not contain any NULLs. RowIterator vertexRowIterator = inputs.getRowIterator("vertices_alias"); boolean hasRow = vertexRowIterator.advanceToNextRow(); if (!hasRow) { String msg = errMsg + "vertex iterator was empty. Possible causes " + "include an edge whose src_id (ID of the starting vertex) " + "does not match the vertex ID of any vertex."; throw new ClientVisibleException(msg); } String cityName = vertexRowIterator.getStringAt(VTX__CITYNAME_COL_); // Create a new vertex based on the information in this row. TransitVertex transitVertex = new TransitVertex(vertexKey, cityName); // "Add" this vertex to the vertex state. (Unintuitively, the vertex is // stored "in" the vertex state, not the other way around.) vtxState.addVertex(transitVertex); /* * Create a list of incident edges. Column EDGE__DEST_ID_COL_ holds the * vertex key value of a neighboring destination vertex. */ RowIterator edgeTableIterator = inputs.getRowIterator("edges_alias"); RowHolder destVertexKeyValue = null; double destVertexKeyAsDouble = (double) 0.0; VertexKey destVertexKey = null; // This will hold the edges (one at a time) of the vertex we create. TransitEdge transitEdge = null; // True if this vertex has an outgoing edge that we haven't processed yet. boolean hasAnotherEdge = edgeTableIterator.advanceToNextRow(); double distance = 0.0; double samplingWeight = 0.0; double weight = 0.0; while (hasAnotherEdge) { // Create the destination vertex key. destVertexKeyValue = new RowHolder(vertexKeySchema_); destVertexKeyAsDouble = edgeTableIterator.getDoubleAt(EDGE__DEST_ID_COL_); destVertexKeyValue.setDoubleAt(0, destVertexKeyAsDouble); destVertexKey = new VertexKey(destVertexKeyValue); if (edgeTableIterator.isNullAt(EDGE__SAMPLING_WEIGHT_COL_)) { String msg = errMsg + "edgeTableIterator: EDGE__SAMPLING_WEIGHT_COL_: NULL: "; throw new ClientVisibleException(msg); } samplingWeight = edgeTableIterator.getDoubleAt(EDGE__SAMPLING_WEIGHT_COL_); if (edgeTableIterator.isNullAt(EDGE__DISTANCE_COL_)) { String msg = errMsg + "edgeTableIterator: EDGE__DISTANCE_COL_: NULL: "; throw new ClientVisibleException(msg); } distance = edgeTableIterator.getDoubleAt(EDGE__DISTANCE_COL_); // Construct an Edge: use the vertex key of the destination vertex // and add the edge to the Vertex object (actually, to the // vertex's VertexState). transitEdge = new TransitEdge(destVertexKey, samplingWeight, distance); if (edgeTableIterator.isNullAt(EDGE__WEIGHT_COL_)) { String msg = errMsg + "edgeTableIterator: EDGE__WEIGHT_COL_: NULL: "; throw new ClientVisibleException(msg); } weight = edgeTableIterator.getDoubleAt(EDGE__WEIGHT_COL_); transitEdge.setWeight(weight); vtxState.addEdge(transitEdge); hasAnotherEdge = edgeTableIterator.advanceToNextRow(); }// for each row corresponding to an edge }// initializeVertex /** * This sends a message to a vertex; the message includes the distance * (in km) up to and including the vertex that receives the message. * @param distanceSoFar the distance up to, and including, the vertex that * receives the message. * @param vertexState the vertexState of the vertex sending the message. * @param outputMessages an emitter for constructing and sending messages * to other vertices. */ private void sendMessages(double distanceSoFar, VertexState vertexState, VertexMessageEmitter outputMessages) { // The vertex key of the vertex that we will send the message to. VertexKey targetVertexKey = null; // The distance from the sending vertex to the receiving vertex. double distanceToNextVertex = (double) 0.0; // The total distance to the next vertex (the distance so far // plus the distance along the edge to the next vertex). double totalDistance = (double) 0.0; // The edge that we'll send the message along. TransitEdge currentEdge = null; // For each outgoing edge of this vertex, send a message... EdgeIterator edgeIterator = vertexState.getEdgeIterator(); String msg = ""; while (edgeIterator.advanceToNextEdge()) { /* * Grab the next incident edge. Form the vertex key for the * destination vertex, build the message payload, and emit the * message. */ currentEdge = (TransitEdge) edgeIterator.getEdge(); targetVertexKey = currentEdge.getTargetVertexKey(); distanceToNextVertex = currentEdge.getDistance(); totalDistance = distanceSoFar + distanceToNextVertex; outputMessages.addDouble(totalDistance); outputMessages.emitVertexMessage(targetVertexKey); } edgeIterator.close(); } /** * The operateOnVertex method propagates messages sent from the search's * starting vertex until we find the ending vertex. * * The start vertex is the only vertex to send messages during * iteration 0. After that, each vertex propagates received messages * to its "downstream" vertices along its incident edges. * * Once a vertex propagates vertex messages, its job is done and it * calls "deactivate()". The algorithm stops if either the destination * vertex gets a message (issues a "global halt") or no vertex sends any * messages. * * To avoid getting into an infinite loop if there is a loop in the graph, * each vertex checks the distance in each incoming message, and it * sends an outgoing message only if the newest distance is shorter than * any previous distance. (This also reduces the number of messages.) * * To help make this clear, think about the difference between * finding the shortest path in terms of edges vs. the shortest path in * terms of kilometers. If we want to find the fewest edges, then we * can call localHalt() during the first iteration that a message reaches * this vertex; any message in a subsequent iteration that reaches this * vertex is guaranteed to have taken more edges to get here (one edge per * iteration), AND won't influence the search in a way that reduces the * number of edges in the "downstream" portion of the path. So the path is * guaranteed to be longer (in edges). * Contrast that with the algorithm for finding the smallest number of * kilometers. A path that required more edges but fewer kilometers * (averages fewer kilometers per edge) is NOT something that we want to * ignore, so we deactivate rather than localHalt(). * * In this particular test/demo, we have a nested loop: * for each incoming message: * for each outgoing edge: * send message * If there are 2 incoming messages and 3 outgoing edges, the program potentially sends * 6 outgoing messages, but only 3 of those actually have the shortest * distance in them. (The number that the program sends depends upon whether * the incoming message I'm processing now has a shorter distance than any * previous incoming message. If it's shorter, the code sends a new set of outgoing * messages. If it's longer, nothing is sent.) It would be more * efficient to code this differently; instead of nesting the "per outgoing * edge" loop inside the "per incoming message" loop, the code could read ALL of * the incoming messages first, select the shortest distance, and then send * only 1 round of outgoing messages. * for each incoming message: * if distance is shorter than previous, then keep new shortest dist. * for each outgoing edge: * send message * @param globals is a GraphGlobals with graph global state * @param vertexState is the vertex state information about the vertex we * will operate on. * @param inputMessages is an iterator of input vertex messages. * @param outputMessages is an emitter for building and sending messages to * other vertices. * @param finalRows an emitter for emitting output rows. */ public void operateOnVertex(GraphGlobals globals, VertexState vertexState, VertexMessageIterator inputMessages, VertexMessageEmitter outputMessages, RowEmitter finalRows) { // Get this vertex and its vertex key. TransitVertex vertex = (TransitVertex) vertexState.getVertex(); VertexKey vtxKey = vertex.getVertexKey(); // If this is iteration 0 ... if (globals.getIteration() == 0) { // If this vertex is the "start" vertex if (vkComparator_.compare(vtxKey, startVertexKey_) == 0) { // If this vertex is also the end vertex... if (vkComparator_.compare(vtxKey, endVertexKey_) == 0) { vertex.setDistanceFromSource( (double) 0.0); System.out.println("globalHalt 1"); globals.globalHalt(); return; } // If this vertex is the "start" vertex but not the end vertex... else { // For each outgoing edge of this vertex, send a message... sendMessages((double) 0.0, vertexState, outputMessages); } } // After the 0th iteration, only the vertices that get messages will // need to do anything, so we'll deactivate each vertex until it gets // a message. This does not change the output, but it reduces query // time enormously when the search requires many iterations. vertexState.deactivate(); return; } // If it's not iteration 0, and there is at least 1 incoming message... else if (inputMessages.advanceToNextMessage()) { // The distance so far (in kilometers) from the start vertex to this // vertex. double distanceSoFar; // For each incoming message, if it has a shorter distance than any // previous message to this vertex, then remember that distance. do { // Get the distance in the current message. distanceSoFar = inputMessages.getDoubleAt(MSG__DISTANCE_COL); // If this is shorter than any previous distance we received... if (distanceSoFar < vertex.getDistanceFromSource()) { // Store this new shortest distance vertex.setDistanceFromSource(distanceSoFar); } } while (inputMessages.advanceToNextMessage()); // If this vertex is the end vertex... if (vkComparator_.compare(vtxKey, endVertexKey_) == 0) { System.out.println("globalHalt 2"); globals.globalHalt(); return; } // If this isn't the end vertex that we're searching for, then... else { // ...for each outgoing edge of this vertex, send a message... System.out.println("DDDIAGNOSTIC: operateOnVertex(): send"); // If this is the shortest distance we received so far... if (distanceSoFar <= vertex.getDistanceFromSource()) { sendMessages(vertex.getDistanceFromSource(), vertexState, outputMessages); } // Now that this vertex has responded to each message, the vertex // deactivates itself until/unless it gets another message. System.out.println("deactivate"); vertexState.deactivate(); } }// not iteration 0, and there are incoming messages }// operateOnVertex /** * The emitFinalRows method is called once per vertex to determine if the * path of interest was found. That will be the case if the ending * vertex has a "shortestDistance" that is different from the starting * value, in which case that vertex will output a single row with the * distance. No rows are emitted otherwise. * * @param globals is a GraphGlobals with graph global state * @param vertexState is a Vertex with local vertex state * @param emitter is a RowEmitter for emitting final results rows */ public void emitFinalRows(GraphGlobals globals, VertexState vertexState, RowEmitter emitter) { TransitVertex vertex = (TransitVertex) vertexState.getVertex(); VertexKey vertexKey = vertex.getVertexKey(); // If this is the vertex we're searching for... if (vkComparator_.compare(vertexKey, this.endVertexKey_) == 0) { if (vertex.getDistanceFromSource() < TransitVertex.DEFAULT_DISTANCE_) { emitter.addDouble(vertex.getDistanceFromSource()); emitter.emitRow(); } } return; }// emitFinalRows /** * Ignore undeliverable messages. These may be seen if a message is sent * to a target which is not an initialized vertex. * * @param globals provides the iteration number and other global state * @param undeliverableMessages the messages that could not be delivered */ public void undeliverableMessagesHandler(GraphGlobals globals, VertexMessageIterator undeliverableMessages) { while (undeliverableMessages.advanceToNextMessage()) { System.out.println("ERROR: undeliverable message: distance = " + undeliverableMessages.getDoubleAt(MSG__DISTANCE_COL)); } return; } class VertexKeyComparator implements ComparatorVertexKey{ ArrayListComparatorValueView valueComparators_ = null; ArrayListValueHolder lValues = null; ArrayListValueHolder rValues = null; VertexKeyComparator(ListSqlType vkSchema) { valueComparators_ = new ArrayListComparatorValueView(); lValues = new ArrayListValueHolder(); rValues = new ArrayListValueHolder(); SqlType type = null; for (int i = 0; i < vkSchema.size(); ++i) { type = vkSchema.get(i); valueComparators_.add( ValueViewComparator.getComparator(type)); lValues.add(new ValueHolder(type)); rValues.add(new ValueHolder(type)); } } @Override public synchronized int compare(VertexKey l, VertexKey r) { assert valueComparators_.size() == l.getColumnCount(); assert valueComparators_.size() == r.getColumnCount(); ValueHolder lValue = null; ValueHolder rValue = null; int ret = 0; for (int i = 0 ; i < valueComparators_.size(); ++i) { lValue = lValues.get(i); rValue = rValues.get(i); // getValueAt() will copy the i'th value from the VertexKey to the // ValueHolder (lValue or rValue below). l.getValueAt(i, lValue); r.getValueAt(i, rValue); if (lValue.isNull()) { if (rValue.isNull()) { continue; } return -1; } else { if (rValue.isNull()) { return 1; } } ret = valueComparators_.get(i).compare(lValue,rValue); if (0 != ret) { return ret; } } return 0; } } }// SimpleSampleSearch