/* * Unpublished work. * Copyright (c) 2013 by Teradata Corporation. All rights reserved. * TERADATA CORPORATION CONFIDENTIAL AND TRADE SECRET */ import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.math.BigInteger; import com.asterdata.ncluster.sqlmr.ApiVersion; import com.asterdata.ncluster.sqlmr.ByFunctionPredicate; import com.asterdata.ncluster.sqlmr.ClientVisibleException; import com.asterdata.ncluster.sqlmr.CollaborativePlanning; import com.asterdata.ncluster.sqlmr.data.ColumnDefinition; import com.asterdata.ncluster.sqlmr.data.PartitionDefinition; import com.asterdata.ncluster.sqlmr.data.RowEmitter; import com.asterdata.ncluster.sqlmr.data.RowIterator; import com.asterdata.ncluster.sqlmr.data.SqlType; import com.asterdata.ncluster.sqlmr.data.ValueHolder; import com.asterdata.ncluster.sqlmr.Distribution; import com.asterdata.ncluster.sqlmr.DistributionType; import com.asterdata.ncluster.sqlmr.HelpInfo; import com.asterdata.ncluster.sqlmr.IllegalUsageException; import com.asterdata.ncluster.sqlmr.InputContext; import com.asterdata.ncluster.sqlmr.InputInfo; import com.asterdata.ncluster.sqlmr.OperatorType; import com.asterdata.ncluster.sqlmr.Order; import com.asterdata.ncluster.sqlmr.OrderDefinition; import com.asterdata.ncluster.sqlmr.OrderLimit; import com.asterdata.ncluster.sqlmr.OutputInfo; import com.asterdata.ncluster.sqlmr.PartitionFunction; import com.asterdata.ncluster.sqlmr.PlanInputInfo; import com.asterdata.ncluster.sqlmr.PlanningContract; import com.asterdata.ncluster.sqlmr.PlanOutputInfo; import com.asterdata.ncluster.sqlmr.OnInputPredicate; import com.asterdata.ncluster.sqlmr.OnOutputPredicate; import com.asterdata.ncluster.sqlmr.QueryContextInfo; import com.asterdata.ncluster.sqlmr.QueryContextReply; import com.asterdata.ncluster.sqlmr.RowFunction; import com.asterdata.ncluster.sqlmr.RuntimeContract; import com.asterdata.ncluster.sqlmr.util.AsterVersion; import com.asterdata.ncluster.util.ImmutableList; @HelpInfo( usageSyntax = "md5sum_cbp(" + " on ..." + " )", shortDescription = "Echos the input columns in addition to the md5sum of " + "each column value", longDescription = "By default, the function will have the following behavior:\n" + "* The function will pass through the input Distribution" + " and Order to the output.\n" + "* The OrderLimit will be applied when it appears in the query context.\n" + "* If the predicate has operator type '<' and is applied to an" + " ascending column, it will be applied by the function. Otherwise," + " the predicate will be pushed to the input.\n" + "* The function will agree to conduct output column projection should" + " the planner ask for it.\n" + "* The function will specify the input column projection if there is" + " output column projection. A column from the input that" + " does not appear in the output will be added to the input columns" + " needed list when possible.\n\n" + "The function's default behavior can be changed with the following" + " argument:\n" + " outputColRename: Rename the output columns by adding a prefix" + " to the input column name. There are also the following side-effects" + " when specifying this argument:\n" + "* The column names in the Order and Distribution of the PlanOutputInfo" + " will be renamed by prepending the prefix 'prefix_'.\n" + "* The output Order will be a prefix of the columns in the input" + " Order\n" + "* The distribution will be effectively removed, i.e., if the input" + " Distribution is of type 'distributed', the output" + " distribution will be set to 'any'.", inputColumns = "col1, col2, ..., coln", outputColumns = "col1, col1_md5sum, col2, col2_md5sum, ..., coln, coln_md5sum", author = AsterVersion.ASTER_COPYRIGHT_STRING, version = AsterVersion.ASTER_VERSION_STRING, apiVersion = ApiVersion.CURRENT_API_VERSION ) public final class md5sum_cbp implements RowFunction, PartitionFunction, CollaborativePlanning { private boolean outputColRename_; private int passThruColA_inputInd_; private int passThruColA_outputInd_; private int passThruColB_inputInd_; private int passThruColB_outputInd_; private long numRowsEmitted_; private long limit_; private InputInfo runtimeContractInputInfo_; private OutputInfo runtimeContractOutputInfo_; private RuntimeSchema runtimeSchema_; private ListColumnDefinition finalOutputColumns_; private ListSqlType rowSqlTypes_; private ByFunctionPredicate byFunctionPredicateLT_; private ColumnDefinition byFunctionColDef_; private PlanningContract.CompletedContract complPlanContract_; private final String prefix_ = "prefix_"; private final String md5sumSuffix_ = "_md5sum"; public md5sum_cbp(RuntimeContract contract) { outputColRename_ = false; passThruColA_inputInd_ = 0; passThruColA_outputInd_ = 0; passThruColB_inputInd_ = 1; passThruColB_outputInd_ = 1; numRowsEmitted_ = 0; limit_ = -1; InputInfo inputInfo = contract.getInputInfo(); runtimeContractInputInfo_ = inputInfo; // Determine which collaborative planning optimizations to enable if (contract.hasArgumentClause("cbpmode") ) { String cbpmode = contract.useArgumentClause("cbpmode").getSingleValue(); System.out.println ("cbpmode: " + cbpmode); if (cbpmode.equalsIgnoreCase("outputColRename")) { outputColRename_ = true; } else { throw new IllegalUsageException ("cbpmode: " + cbpmode + " is invalid"); } } // // The output information is taken directly from the input information. // ImmutableListColumnDefinition inputSchema = inputInfo.getColumns(); checkInputSchema (inputSchema); // // The output information is taken directly from the input information. // ListColumnDefinition outputColumns = new ArrayColumnDefinition(); ListSqlType columnTypes = ColumnDefinition.typesFromColumns(inputSchema); for (int i = 0; i < columnTypes.size(); ++i) { String columnName = inputInfo.getColumnName(i); ////////////////////////////////////////////////////////////////////// // For the following arguments, we rename the output order // column pre-pending the string // prefix_ to the input order column name. if (outputColRename_) { columnName = prefix_ + columnName; } outputColumns.add(new ColumnDefinition(columnName, columnTypes.get(i))); outputColumns.add(new ColumnDefinition(columnName + md5sumSuffix_, SqlType.getType("character varying"))); } runtimeContractOutputInfo_ = new OutputInfo( outputColumns ); runtimeSchema_ = new RuntimeSchema (inputSchema, outputColumns, prefix_); contract.setOutputInfo( runtimeContractOutputInfo_ ); contract.complete(); } private void checkInputSchema (ListColumnDefinition cols) { for (ColumnDefinition col: cols) { if (col.getColumnName().endsWith(md5sumSuffix_)) { throw new ClientVisibleException ("Input column name cannot have suffix " + md5sumSuffix_); } } } public String calculateMd5sum(String value) { if (value == null) return null; try { MessageDigest m = MessageDigest.getInstance("MD5"); m.update(value.getBytes(), 0, value.length()); BigInteger md5sum = new BigInteger(1, m.digest()); return String.format("%1$032x", md5sum); } catch (NoSuchAlgorithmException e) { e.printStackTrace(); return null; }//end catch }//end calculdateMd5sum public void operateOnSomeRows( RowIterator inputIterator, RowEmitter outputEmitter ) { while (inputIterator.advanceToNextRow()) { ////////////////////////////////////////////////////////////////////// // Enforce the (order) limit, if // the function agreed to apply it in the QueryContextReply. // The variable numRowsEmitted_ is an instance variable // initialized once to 0 in the function's constructor. // If the following code block were in a partition function's // operateOnPartition method, then the variable value would be persistent // across calls to operateOnPartition. if ( limit_ != -1 && numRowsEmitted_ >= limit_) { break; } outputEmitter = buildOutputRow (inputIterator, outputEmitter); if (outputEmitter != null) { outputEmitter.emitRow(); } else { break; } numRowsEmitted_++; } // while inputIterator } /* This method add values for an output row. The method returns null if the function should no longer emit rows, when it applying a limit, for instance. */ private RowEmitter buildOutputRow(RowIterator inputIterator, RowEmitter outputEmitter) { assert (inputIterator != null); assert (outputEmitter != null); for (int finalOutputIndex=0; finalOutputIndex<finalOutputColumns_.size(); finalOutputIndex++) { Integer inputIndex; ColumnDefinition finalOutputColDef = finalOutputColumns_.get(finalOutputIndex); String finalOutputColName = finalOutputColDef.getColumnName(); Integer runtimeContractOutputIndex = runtimeSchema_.getRuntimeOutputIndex(finalOutputColName); // add value from the corresponding index of input to output // emitter. use the mod of the corresponding index // in the runtime contract's output schema to determine // whether the column corresponds to an input column // or an md5sum. if (runtimeContractOutputIndex % 2 == 0) { // add value from corresponding index of input to output // emitter inputIndex = runtimeSchema_.getRuntimeInputIndex( finalOutputColDef.getColumnName()); assert (inputIndex >= 0); assert (inputIndex < inputIterator.getColumnCount()); if (inputIterator.isNullAt(inputIndex)) { outputEmitter.addNull(); } else { ValueHolder vh = new ValueHolder (rowSqlTypes_.get(finalOutputIndex)); inputIterator.getValueAt(inputIndex, vh); //////////////////////////////////////////////////////////////// // Apply a predicate '<' that was pushed // to the function. It is assumed that the output column involved // in the predicate has data in ascending order. // // Iteration over the input and emission to the output // is terminated if the by function predicate with // operatorType '<' evaluates to false on the column // with data in ascending order. // It was determined during planning contract // negotiation that the predicate's column // is in ascending order. // // Also, the checks below ensure that the predicate column was // not projected out. if (byFunctionColDef_ != null && finalOutputColDef.getColumnName().equals(byFunctionColDef_.getColumnName()) && finalOutputColDef.getColumnType() == byFunctionColDef_.getColumnType() && !byFunctionPredicateLT_.evaluate(vh)) { return null; } outputEmitter.addValue(vh); } } // compute the input value's md5sum for the output column // assuming the column names ends with "_md5sum" else { inputIndex = runtimeSchema_.getRuntimeInputIndex( finalOutputColDef.getColumnName().split(md5sumSuffix_)[0]); assert (inputIndex >= 0); assert (inputIndex < inputIterator.getColumnCount()); if (inputIterator.isNullAt(inputIndex)) { outputEmitter.addNull(); } else { String value = inputIterator.getStringAt(inputIndex); outputEmitter.addString(calculateMd5sum(value)); } } } // for each finalOutputColumn return outputEmitter; } public void operateOnPartition( PartitionDefinition partition, RowIterator inputIterator, RowEmitter outputEmitter ) { operateOnSomeRows(inputIterator, outputEmitter); } public void completePlanningContract( PlanningContract planContract ) { // read in the incomplete planning contract PlanInputInfo planInputInfo = planContract.getPlanInputInfo(); Distribution inDistribution = null; ListColumnDefinition inDistrCols = null; DistributionType inDistrType = null; // read in distribution if (planInputInfo.hasDistribution()) { inDistribution = planInputInfo.getDistribution(); inDistrCols = inDistribution.getDistributionColumns(); inDistrType = inDistribution.getDistributionType(); } // read in order Order inOrder = null; ListOrderDefinition inOrderDefs = null; if (planInputInfo.hasOrder()) { inOrder = planInputInfo.getOrder(); inOrderDefs = inOrder.getOrderDefinitions(); } // read in query context info QueryContextInfo qci = null; OrderLimit orderLimit = null; ListOnOutputPredicate outPreds = null; ListOnInputPredicate onInputPredicates = new ArrayListOnInputPredicateOnInputPredicate(); if (planContract.hasQueryContextInfo()) { qci = planContract.getQueryContextInfo(); // read in order limit if (qci.hasOrderLimit()) { orderLimit = qci.getOrderLimit(); } outPreds = qci.getOnOutputPredicates(); } /////////////////////////////////////////////////////////////////// // Input distribution passes through as output distribution, // specified in the completedPlanningContract method. Distribution outDistribution = null; if (!outputColRename_) { outDistribution = inDistribution; } else if (planInputInfo.hasDistribution()) { ArrayListColumnDefinition outDistrCols = new ArrayListColumnDefinition(); for (ColumnDefinition inDistrCol: inDistrCols) { //////////////////////////////////////////////////////////////////// // Rename the output distribution column rename by prepending // prefix_ to the input column name. // The output columns names in the runtime contract // have also been prepended with the prefix_. ColumnDefinition outDistrCol = new ColumnDefinition (prefix_ + inDistrCol.getColumnName(), inDistrCol.getColumnType()); outDistrCols.add(outDistrCol); } /////////////////////////////////////////////////////////////////////// // Kill the distribution column, going from 'distributed' to 'any', // specified in the completePlanningContract method DistributionType outDistrType; // change the distribution type if (inDistrType == DistributionType.distributed){ outDistrType = DistributionType.any; } else { outDistrType = inDistrType; } outDistribution = new Distribution.Builder() .setDistributionColumns(outDistrCols) .setDistributionType(outDistrType) .toDistribution(); } ////////////////////////////////////////////////////////////////////////// // Passthrough of input order, which is the same as output order. // This is specified in the completePlanningContract method. Order outOrder = null; if (!outputColRename_) { outOrder = inOrder; } else if (planInputInfo.hasOrder()) { ////////////////////////////////////////////////////////////////////////// // Send prefix of the input order. For instance, if the input order is // a,b,c, the output order is // a,b. int numOutOrderDefs = inOrderDefs.size()-1; ArrayListOrderDefinition outOrderDefs = new ArrayListOrderDefinition(); for (int i=0; i<numOutOrderDefs; i++) { OrderDefinition inOrderDef = inOrderDefs.get(i); // Rename the output order column name by prepending // prefix_ to the input column name. // The output columns names in the runtime contract // have also been prepended with the prefix_. OrderDefinition outOrderDef = new OrderDefinition (prefix_ + inOrderDef.getColumnName(), inOrderDef.getColumnType(), inOrderDef.isAscending(), inOrderDef.isNullsFirst()); outOrderDefs.add(outOrderDef); } outOrder = new Order.Builder() .setOrderDefinitions(outOrderDefs) .toOrder(); } PlanOutputInfo planOutputInfo = null; planOutputInfo = new PlanOutputInfo.Builder() .setDistribution(outDistribution) .setOrder(outOrder) .toPlanOutputInfo(); planContract.setPlanOutputInfo(planOutputInfo); ////////////////////////////////////////////////////////////////////////// // Conduct output column projection boolean shouldProjectOutputColumns = false; if (planContract.hasQueryContextInfo() && !qci.getOutputColumnsToProject().isEmpty()) { shouldProjectOutputColumns = true; finalOutputColumns_ = qci.getOutputColumnsToProject(); } else { finalOutputColumns_ = runtimeSchema_.getOutputCols(); } ArrayListInputContext inputContexts = new ArrayListInputContext(); ListByFunctionPredicate byFunctionPredicates = new ArrayListByFunctionPredicate(); if (planContract.hasQueryContextInfo()) { ///////////////////////////////////////////////////////////////////////// // Conduct input column projection // This shows an example where input columns needed are the output columns // projected. // This is specified in the completePlanningContract method. ListColumnDefinition inputColumnsNeeded = new ArrayListColumnDefinition(); // add the output columns projected to the input column // projection ListColumnDefinition outputColumnsToProject = qci.getOutputColumnsToProject(); // The planner expects inputColumnsNeeded only when there are // output columns to project // // Add a new column from the input that is not in the output // column projection if (!outputColumnsToProject.isEmpty()) { inputColumnsNeeded = getInputColNotInOutputProjection(outputColumnsToProject); } String inputColNameNeeded=null; if (!inputColumnsNeeded.isEmpty()) { inputColNameNeeded = inputColumnsNeeded.get(0).getColumnName(); } HashSetString columnNamesAdded = new HashSetString(); for (ColumnDefinition colDef: outputColumnsToProject) { assert (colDef != null); // If projection contains column names // "id", and "number_md5sum", // the columns needed would be "id" and "number" // String columnName; if (colDef.getColumnName().endsWith(md5sumSuffix_)) { columnName = colDef.getColumnName().split(md5sumSuffix_)[0]; } else { columnName = colDef.getColumnName(); } if (!columnName.equals(inputColNameNeeded) && !columnNamesAdded.contains(columnName)) { Integer inputColIndex = runtimeSchema_.getRuntimeInputIndex(columnName); assert inputColIndex != null; ColumnDefinition inputColumn = runtimeSchema_.getInputCols().get(inputColIndex); inputColumnsNeeded.add(inputColumn); columnNamesAdded.add(columnName); } } for (OnOutputPredicate outPred : outPreds) { boolean pushedToFunction = false; ListColumnDefinition predOutCols = outPred.getColumns(); //////////////////////////////////////////////////////////////////////// // The following is an example of predicate push-down to the function, // where operator is '<' and the predicate column has // data in ascending order // (versus a column containing md5sum's). // This is specified in the completePlanningContract method // if (outPred.isSimpleConstraint()) { // a simple constraint can only involve a single column ColumnDefinition predOutCol = predOutCols.get(0); if (outPred.getOperatorType() == OperatorType.LT && // isColAscending is a helper function that // determines whether the column is produced // by the function in ascending order. // As all input columns pass through to the // output, // the function looks at the corresponding // OrderDefinition in the Order of the PlanInputInfo // to see if it is ascending. isColAscending (predOutCol, planContract) ) { ByFunctionPredicate funcPred = outPred.pushToFunction(); byFunctionPredicates.add(funcPred); pushedToFunction = true; } } //////////////////////////////////////////////////////////////////////// // The following is an example of predicate push-down to input, // where input column names are different // from output column names. // Semantically, the md5sum column doesn't have an input counter-part, // so the predicate on that column cannot be pushed down. // Thus, some predicates can be pushed down to input, and some cannot // depending on which columns are involved in the predicate. // This is specified in the completePlanningContract method // if (!pushedToFunction) { // The helper function returns a 1-to-1 mapping of the output // columns mapped to their input counterparts. // If no such mapping can be made (for instance, if the // output column corresponds to an md5sum for which there is no // corresponding input), the helper function returns null ListColumnDefinition mappedCols = mapOutputColumnsToInput (predOutCols); if (mappedCols != null) { OnInputPredicate onInputPredicate = outPred .pushToInput(mappedCols); onInputPredicates.add(onInputPredicate); } } } // for each outPred // input name should be obtained from runtime contract, not the // optional PlanInputInfo String inputName = runtimeContractInputInfo_.getInputName(); InputContext inputContext = new InputContext.Builder() .setInputName(inputName) .setInputColumnsNeeded(inputColumnsNeeded) .setOnInputPredicates(onInputPredicates) .toInputContext(); inputContexts.add(inputContext); } // has query context info //////////////////////////////////////////////////////////////////////// // The following is an example of determining whether the OrderLimit // can be applied. // The OrderLimit can be applied by the // function if either of the following is true: // 1. there are no order by columns in OrderLimit // or // 2. the query's order by columns are a prefix subsequence of the // ordered output columns // // Example: // In: // select a, b, c from foo() order by c, b limit 100 // If foo's output guarantees ordering on c, b,... then we // can apply the limit on a per worker basis. // // The following code would be appear in the method // completePlanningContract. // boolean willApplyLimit = false; if (planContract.hasQueryContextInfo() && qci.hasOrderLimit() && planContract.hasPlanOutputInfo()){ // assume here that the PlanOutputInfo has already be set by the // function. planOutputInfo = planContract.getPlanOutputInfo(); assert planOutputInfo != null; // It's safe to apply limit // if there are no order by // columns. if (orderLimit.getOrderDefinitions().isEmpty()) { willApplyLimit = true; } // The helper method areOrderCompatible returns true if // the limit's order by columns are a prefix of the // ordered output columns, and false otherwise. else if (planOutputInfo.hasOrder() && areOrdersCompatible (planOutputInfo.getOrder().getOrderDefinitions(), orderLimit.getOrderDefinitions())) { willApplyLimit = true; } else { willApplyLimit = false; } } QueryContextReply qcr = new QueryContextReply.Builder() .setProjectOutputColumns(shouldProjectOutputColumns) .setInputContexts(inputContexts) .setByFunctionPredicates(byFunctionPredicates) .setApplyLimit(willApplyLimit) .toQueryContextReply(); planContract.setQueryContextReply(qcr); planContract.complete(); } /* Taking the output columns to project as an argument, * this method finds an input column (if any) * that does not appear in the output column projection. It returns this * column to be added to the list of input columns needed. */ ListColumnDefinition getInputColNotInOutputProjection (ListColumnDefinition outputColumnsToProject) { ListColumnDefinition inputColumnsNeeded = new ArrayListColumnDefinition(); for (ColumnDefinition colDef: runtimeSchema_.getOutputCols()) { if (!colDef.getColumnName().endsWith(md5sumSuffix_)) { // if the corresponding column does not appear in the output, // add the column to the inputColumnsNeeded list. int outputIndex = ColumnDefinition.indexOfColumn(outputColumnsToProject, colDef.getColumnName()); // if output column does not appear in the projection if (outputIndex == -1) { Integer inputIndex = runtimeSchema_.getRuntimeInputIndex(colDef.getColumnName()); assert inputIndex != null; ListColumnDefinition inputCols = runtimeSchema_.getInputCols(); inputColumnsNeeded.add(inputCols.get(inputIndex)); } } } return inputColumnsNeeded; } /* Returns true if the order of the OrderLimit is a prefix of the * order of the OutputOrder */ boolean areOrdersCompatible (ListOrderDefinition poDefs, ListOrderDefinition olDefs) { if (poDefs == null) { return false; } if (olDefs.size() > poDefs.size()) { return false; } for (int i=0; i<olDefs.size(); i++) { OrderDefinition poDef = poDefs.get(i); OrderDefinition olDef = olDefs.get(i); if (!poDef.getColumnName().equals(olDef.getColumnName()) || poDef.isAscending() != olDef.isAscending() || poDef.isNullsFirst() != olDef.isNullsFirst() ) { return false; } } return true; } /* Returns when the column data is in ascending order */ boolean isColAscending (ColumnDefinition outCol, PlanningContract planContract) { if (outCol.getColumnName().endsWith(md5sumSuffix_)) { return false; } if (planContract.hasPlanInputInfo()) { PlanInputInfo pii = planContract.getPlanInputInfo(); if (pii.hasOrder()) { Order order = pii.getOrder(); ListOrderDefinition orderDefs = order.getOrderDefinitions(); // remove prefix before comparing output column name with input // column name String outColName = outCol.getColumnName(); if (outColName.startsWith(prefix_)) { outColName = outColName.split(prefix_)[1]; } for (OrderDefinition orderDef: orderDefs) { if (orderDef.getColumnName().equals(outColName) && orderDef.getColumnType() == outCol.getColumnType()) { return orderDef.isAscending(); } } } } return false; } // This helper function specifies the 1-to-1 mapping of the output // columns mapped to their input counterparts. // Columns A and B are pass through and only predicates on these columns // can be pushed to the input. // Any output column that corresponds to an md5sum is clearly not // pass through, and so any predicate that contains such a column cannot // be pushed to the input. ListColumnDefinition mapOutputColumnsToInput (List<ColumnDefinition> predOutCols) { ListColumnDefinition inCols = runtimeContractInputInfo_.getColumns(); ListColumnDefinition outCols = runtimeContractOutputInfo_.getColumns(); ListColumnDefinition mappedCols = new ArrayListColumnDefinition(); int numMapped = 0; for (ColumnDefinition predOutCol: predOutCols) { // // if any of the output columns correspond to columns containing // md5sum's, // the function cannot push the predicate to the input int outputIndex = ColumnDefinition.indexOfColumn(outCols, predOutCol.getColumnName()); if (outputIndex != -1 && outputIndex % 2 == 1) { return null; } // // If any of the output columns cannot be mapped to the input, // i.e., the corresponding input column is not passthrough, // the function cannot push the predicate to the input. // // mapOutputColumnToInput is a helper function that // determines which input column if any is mappable to the // output columns. // If the output column cannot be mapped to any // of the inputs, the helper function returns null // Integer inputIndex = mapOutputColumnToInput (predOutCol); if (inputIndex == null) { return null; } mappedCols.add(inCols.get(inputIndex)); numMapped++; } if (numMapped != predOutCols.size()) { return null; } else { return mappedCols; } } // // Helper method to map the output column to an input column. // If the mapping does not exist, the method returns null. // The method assumes that the input and output indices corrresponding to the // the passthrough columns have been set as instance variables. // In particular, // column A is at index passThruColA_inputInd_ in the input, and is at // index passThruColA_outputInd_ in the output. The same goes for column B. // Typically, these indices can be set in the SQL-analytic method's // constructor where the name of the pass-through column could be given as a // SQL-analytic function argument, for example, and a lookup of its // corresponding index in the schema can be done. // Integer mapOutputColumnToInput (ColumnDefinition predOutCol) { ListColumnDefinition outCols = runtimeContractOutputInfo_.getColumns(); int outputIndex = ColumnDefinition.indexOfColumn(outCols, predOutCol.getColumnName()); // One should ensure a 1-to-1 mapping, possibly by tracking which columns // have been mapped if (outputIndex == passThruColA_outputInd_) { return passThruColA_inputInd_; } else if (outputIndex == passThruColB_outputInd_) { return passThruColB_inputInd_; } else { return null; } } public void receiveCompletedPlanningContract( PlanningContract.CompletedContract planContract ){ System.out.println ("[MD5SUM_CBP] Receiving completed planning contract"); complPlanContract_ = planContract; processCompletedPlanningContract (planContract); } private void processCompletedPlanningContract ( PlanningContract.CompletedContract planContract) { QueryContextInfo qci; QueryContextReply qcr; if (complPlanContract_ != null && complPlanContract_.hasQueryContextInfo() && complPlanContract_.hasQueryContextReply()) { qci = complPlanContract_.getQueryContextInfo(); qcr = complPlanContract_.getQueryContextReply(); if (qcr.isProjectOutputColumns()) { // if the function previously agreed to conduct // output column projection, get the corresponding projected // output schema finalOutputColumns_ = qci.getOutputColumnsToProject(); } ListByFunctionPredicate byFunctionPredicates = qcr.getByFunctionPredicates(); if (!byFunctionPredicates.isEmpty() && byFunctionPredicates.get(0).getOperatorType() == OperatorType.LT) { byFunctionPredicateLT_ = byFunctionPredicates.get(0); ListColumnDefinition byFunctionColDefs = null; byFunctionColDefs = byFunctionPredicateLT_.getColumns(); byFunctionColDef_ = byFunctionColDefs.get(0); } if (qci.hasOrderLimit() && qcr.isApplyLimit()) { OrderLimit orderLimit = qci.getOrderLimit(); limit_ = orderLimit.getLimit(); } } if (finalOutputColumns_ == null) { // if there is no output column projection to be done, // get the orginal output schema from the runtime contract. finalOutputColumns_ = runtimeContractOutputInfo_.getColumns(); } rowSqlTypes_ = ColumnDefinition.typesFromColumns (finalOutputColumns_); } static public class RuntimeSchema { private HashMapString, Integer inputColumnNameToIndexMap; private HashMapString, Integer outputColumnNameToIndexMap; private ListColumnDefinition inputCols; private ListColumnDefinition outputCols; private final String prefix_; public RuntimeSchema (ListColumnDefinition runtimeInputColumns, ListColumnDefinition runtimeOutputColumns, String prefix) { inputCols = runtimeInputColumns; outputCols = runtimeOutputColumns; inputColumnNameToIndexMap = new HashMapString, Integer(); outputColumnNameToIndexMap = new HashMapString, Integer(); for (int i=0; i<runtimeInputColumns.size(); i++) { inputColumnNameToIndexMap.put( runtimeInputColumns.get(i).getColumnName(), i); } for (int i=0; i<runtimeOutputColumns.size(); i++) { outputColumnNameToIndexMap.put( runtimeOutputColumns.get(i).getColumnName(), i); } this.prefix_ = prefix; } public ListColumnDefinition getInputCols() { return inputCols; } public ListColumnDefinition getOutputCols() { return outputCols; } public Integer getRuntimeInputIndex (String columnName) { // remove any output column prefix // when looking for corresponding column in the input if (columnName.length() != 0 && columnName.startsWith(prefix_)) { columnName = columnName.split(prefix_)[1]; } return inputColumnNameToIndexMap.get(columnName); } public Integer getRuntimeOutputIndex (String columnName) { return outputColumnNameToIndexMap.get(columnName); } } }