This section shows you how to use the vertex and edge iterators in the initializeVertex() method of the HelloWorld SQL-MapReduce function.
In this example, you will add println() statements that print the name of the vertex being processed and the rows from the edges table associated with that vertex. In this case, for every vertex, the function is supplied with the edges going out from the vertex. This is called a cogroup or input partition.
-
Replace the content of the initializeVertex() method with the following code:
// // Initialize the vertex key from the partition definition structure. // VertexKey vertexKey = new VertexKey(inputs.getPartitionDefinition()); HelloWorldVertex vertex = new HelloWorldVertex(vertexKey); vertexState.addVertex(vertex); System.out.println(" -----------------------------------------------------"); // Examine rows from input relation "etable" RowIterator etable_inputsIterator = inputs.getRowIterator("etable"); int i=0; // TODO: do something with this row iterator, i.e. fetch certain columns value. System.out.println(" etable column count: " + etable_inputsIterator.getColumnCount()); System.out.println(); RowHolder rowHolder; VertexKey targetVertexKey; if (etable_inputsIterator.advanceToNextRow()){ // Get the column count of etable System.out.println(" Source Node | Destination Node"); do { // Create a row and add to it the id of the target vertex. rowHolder = new RowHolder(SqlType.integer()); // Display the input row contents. System.out.println(" " + etable_inputsIterator.getIntAt(1)+ " "+etable_inputsIterator.getIntAt(2)); rowHolder.setIntAt(0, etable_inputsIterator.getIntAt(2)); targetVertexKey = new VertexKey(rowHolder); System.out.println("Adding edge to vertexState..."); vertexState.addEdge(new Edge(targetVertexKey)); i++; } while (etable_inputsIterator.advanceToNextRow()); } // end if // Examine rows from input relation "vtable." RowIterator vtable_inputsIterator = inputs.getRowIterator("vtable"); // TODO: do something with this row iterator, i.e. fetch certain columns value. if (vtable_inputsIterator.advanceToNextRow()){ System.out.println(); System.out.println(" " + i + " edge(s) go out from this vertex (" + vtable_inputsIterator.getStringAt(1) + ")"); System.out.println(" -----------------------------------------------------"); System.out.println(); }
- Build and deploy the function.
- Run the function using the same SQL-MapReduce statement that you used in the previous example.
- Open the log file to inspect the output.
SQL-GR divides the input into cogroups or input partitions and distributes vertex initialization among the available worker nodes, as shown in the example in the following figure. In this example, the cluster has three worker nodes.
Vertex initialization example