Code - Aster Execution Engine

Teradata Aster® Developer Guide

Product
Aster Execution Engine
Release Number
7.00.02
Published
July 2017
Language
English (United States)
Last Update
2018-04-13
dita:mapPath
xnl1494366523182.ditamap
dita:ditavalPath
Generic_no_ie_no_tempfilter.ditaval
dita:id
ffu1489104705746
lifecycle
previous
Product Category
Software
/*
* Unpublished work.
* Copyright (c) 2013 by Teradata Corporation. All rights reserved.
* TERADATA CORPORATION CONFIDENTIAL AND TRADE SECRET
*/
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;

import java.math.BigInteger;

import com.asterdata.ncluster.sqlmr.ApiVersion;
import com.asterdata.ncluster.sqlmr.ByFunctionPredicate;
import com.asterdata.ncluster.sqlmr.ClientVisibleException;
import com.asterdata.ncluster.sqlmr.CollaborativePlanning;
import com.asterdata.ncluster.sqlmr.data.ColumnDefinition;
import com.asterdata.ncluster.sqlmr.data.PartitionDefinition;
import com.asterdata.ncluster.sqlmr.data.RowEmitter;
import com.asterdata.ncluster.sqlmr.data.RowIterator;
import com.asterdata.ncluster.sqlmr.data.SqlType;
import com.asterdata.ncluster.sqlmr.data.ValueHolder;
import com.asterdata.ncluster.sqlmr.Distribution;
import com.asterdata.ncluster.sqlmr.DistributionType;
import com.asterdata.ncluster.sqlmr.HelpInfo;
import com.asterdata.ncluster.sqlmr.IllegalUsageException;
import com.asterdata.ncluster.sqlmr.InputContext;
import com.asterdata.ncluster.sqlmr.InputInfo;
import com.asterdata.ncluster.sqlmr.OperatorType;
import com.asterdata.ncluster.sqlmr.Order;
import com.asterdata.ncluster.sqlmr.OrderDefinition;
import com.asterdata.ncluster.sqlmr.OrderLimit;
import com.asterdata.ncluster.sqlmr.OutputInfo;
import com.asterdata.ncluster.sqlmr.PartitionFunction;
import com.asterdata.ncluster.sqlmr.PlanInputInfo;
import com.asterdata.ncluster.sqlmr.PlanningContract;
import com.asterdata.ncluster.sqlmr.PlanOutputInfo;
import com.asterdata.ncluster.sqlmr.OnInputPredicate;
import com.asterdata.ncluster.sqlmr.OnOutputPredicate;
import com.asterdata.ncluster.sqlmr.QueryContextInfo;
import com.asterdata.ncluster.sqlmr.QueryContextReply;
import com.asterdata.ncluster.sqlmr.RowFunction;
import com.asterdata.ncluster.sqlmr.RuntimeContract;
import com.asterdata.ncluster.sqlmr.util.AsterVersion;
import com.asterdata.ncluster.util.ImmutableList;

@HelpInfo(
   usageSyntax
      = "md5sum_cbp("
      + " on ..."
      + " )",
   shortDescription = "Echos the input columns in addition to the md5sum of "
                      + "each column value",
   longDescription =
         "By default, the function will have the following behavior:\n"
         + "* The function will pass through the input Distribution"
         + " and Order to the output.\n"
         + "* The OrderLimit will be applied when it appears in the query context.\n"
         + "* If the predicate has operator type '<' and is applied to an"
         + " ascending column, it will be applied by the function. Otherwise,"
         + " the predicate will be pushed to the input.\n"
         + "* The function will agree to conduct output column projection should"
         + " the planner ask for it.\n"
         + "* The function will specify the input column projection if there is"
         + " output column projection. A column from the input that"
         + " does not appear in the output will be added to the input columns"
         + " needed list when possible.\n\n"

         + "The function's default behavior can be changed with the following"
         + " argument:\n"
         + " outputColRename: Rename the output columns by adding a prefix"
         + " to the input column name. There are also the following side-effects"
         + " when specifying this argument:\n"
         + "* The column names in the Order and Distribution of the PlanOutputInfo"
         + " will be renamed by prepending the prefix 'prefix_'.\n"
         + "* The output Order will be a prefix of the columns in the input"
         + " Order\n"
         + "* The distribution will be effectively removed, i.e., if the input"
         + " Distribution is of type 'distributed', the output"
         + " distribution will be set to 'any'.",

   inputColumns = "col1, col2, ..., coln",
   outputColumns = "col1, col1_md5sum, col2, col2_md5sum, ..., coln, coln_md5sum",
   author = AsterVersion.ASTER_COPYRIGHT_STRING,
   version = AsterVersion.ASTER_VERSION_STRING,
   apiVersion = ApiVersion.CURRENT_API_VERSION
)

public final class md5sum_cbp implements RowFunction, PartitionFunction,
                                         CollaborativePlanning
{
   private boolean outputColRename_;

   private int passThruColA_inputInd_;
   private int passThruColA_outputInd_;

   private int passThruColB_inputInd_;
   private int passThruColB_outputInd_;

   private long numRowsEmitted_;
   private long limit_;

   private InputInfo runtimeContractInputInfo_;
   private OutputInfo runtimeContractOutputInfo_;

   private RuntimeSchema runtimeSchema_;
   private ListColumnDefinition finalOutputColumns_;
   private ListSqlType rowSqlTypes_;
   private ByFunctionPredicate byFunctionPredicateLT_;
   private ColumnDefinition byFunctionColDef_;

   private PlanningContract.CompletedContract complPlanContract_;

   private final String prefix_ = "prefix_";
   private final String md5sumSuffix_ = "_md5sum";

   public md5sum_cbp(RuntimeContract contract)
   {
      outputColRename_ = false;

      passThruColA_inputInd_ = 0;
      passThruColA_outputInd_ = 0;

      passThruColB_inputInd_ = 1;
      passThruColB_outputInd_ = 1;

      numRowsEmitted_ = 0;
      limit_ = -1;

      InputInfo inputInfo = contract.getInputInfo();
      runtimeContractInputInfo_ = inputInfo;

      // Determine which collaborative planning optimizations to enable
      if (contract.hasArgumentClause("cbpmode") )
      {
         String cbpmode = contract.useArgumentClause("cbpmode").getSingleValue();

         System.out.println ("cbpmode: " + cbpmode);
         if (cbpmode.equalsIgnoreCase("outputColRename"))
         {
            outputColRename_ = true;
         }
         else
         {
            throw new IllegalUsageException ("cbpmode: " + cbpmode
                  + " is invalid");
         }
      }

      //
      // The output information is taken directly from the input information.
      //
      ImmutableListColumnDefinition inputSchema = inputInfo.getColumns();
      checkInputSchema (inputSchema);

      //
      // The output information is taken directly from the input information.
      //
      ListColumnDefinition outputColumns = new ArrayColumnDefinition();
      ListSqlType columnTypes = ColumnDefinition.typesFromColumns(inputSchema);

      for (int i = 0; i < columnTypes.size(); ++i) {
         String columnName = inputInfo.getColumnName(i);
         //////////////////////////////////////////////////////////////////////
         // For the following arguments, we rename the output order
         // column pre-pending the string
         // prefix_ to the input order column name.
         if (outputColRename_)
         {
            columnName = prefix_ + columnName;
         }
         outputColumns.add(new ColumnDefinition(columnName,
                           columnTypes.get(i)));
         outputColumns.add(new ColumnDefinition(columnName
               + md5sumSuffix_, SqlType.getType("character varying")));
      }

      runtimeContractOutputInfo_ = new OutputInfo( outputColumns );
      runtimeSchema_ = new RuntimeSchema (inputSchema, outputColumns, prefix_);

      contract.setOutputInfo( runtimeContractOutputInfo_ );
      contract.complete();
   }

   private void checkInputSchema (ListColumnDefinition cols)
   {
      for (ColumnDefinition col: cols)
      {
         if (col.getColumnName().endsWith(md5sumSuffix_))
         {
            throw new ClientVisibleException ("Input column name cannot have suffix "
                  + md5sumSuffix_);
         }
      }
   }

   public String calculateMd5sum(String value)
   {
      if (value == null)
      return null;
      try {
         MessageDigest m = MessageDigest.getInstance("MD5");
         m.update(value.getBytes(), 0, value.length());
         BigInteger md5sum = new BigInteger(1, m.digest());
         return String.format("%1$032x", md5sum);
      } catch (NoSuchAlgorithmException e) {
         e.printStackTrace();
         return null;
      }//end catch
   }//end calculdateMd5sum

   public void operateOnSomeRows(
      RowIterator inputIterator,
      RowEmitter outputEmitter
      )
   {
      while (inputIterator.advanceToNextRow())
      {
         //////////////////////////////////////////////////////////////////////
         // Enforce the (order) limit, if
         // the function agreed to apply it in the QueryContextReply.
         // The variable numRowsEmitted_ is an instance variable
         // initialized once to 0 in the function's constructor.
         // If the following code block were in a partition function's
         // operateOnPartition method, then the variable value would be persistent
         // across calls to operateOnPartition.
         if ( limit_ != -1 && numRowsEmitted_ >= limit_)
         {
            break;
         }
         outputEmitter = buildOutputRow (inputIterator, outputEmitter);
         if (outputEmitter != null)
         {
            outputEmitter.emitRow();
         }
         else
         {
            break;
         }
         numRowsEmitted_++;
      } // while inputIterator
   }

   /* This method add values for an output row.
      The method returns null if the function should no longer emit rows,
      when it applying a limit, for instance.
      */
   private RowEmitter buildOutputRow(RowIterator inputIterator,
                                   RowEmitter outputEmitter)
   {
      assert (inputIterator != null);
      assert (outputEmitter != null);

      for (int finalOutputIndex=0;
            finalOutputIndex<finalOutputColumns_.size();
            finalOutputIndex++)
      {
         Integer inputIndex;
         ColumnDefinition finalOutputColDef =
               finalOutputColumns_.get(finalOutputIndex);
         String finalOutputColName = finalOutputColDef.getColumnName();
         Integer runtimeContractOutputIndex =
               runtimeSchema_.getRuntimeOutputIndex(finalOutputColName);

         // add value from the corresponding index of input to output
         // emitter. use the mod of the corresponding index
         // in the runtime contract's output schema to determine
         // whether the column corresponds to an input column
         // or an md5sum.
         if (runtimeContractOutputIndex % 2 == 0)
         {
            // add value from corresponding index of input to output
            // emitter
            inputIndex = runtimeSchema_.getRuntimeInputIndex(
                                      finalOutputColDef.getColumnName());
            assert (inputIndex >= 0);
            assert (inputIndex < inputIterator.getColumnCount());
            if (inputIterator.isNullAt(inputIndex))
            {
               outputEmitter.addNull();
            }
            else
            {
               ValueHolder vh = new ValueHolder (rowSqlTypes_.get(finalOutputIndex));
               inputIterator.getValueAt(inputIndex, vh);

               ////////////////////////////////////////////////////////////////
               // Apply a predicate '<' that was pushed
               // to the function.  It is assumed that the output column involved
               // in the predicate has data in ascending order.
               //
               // Iteration over the input and emission to the output
               // is terminated if the by function predicate with
               // operatorType '<' evaluates to false on the column
               // with data in ascending order.
               // It was determined during planning contract
               // negotiation that the predicate's column
               // is in ascending order.
               //
               // Also, the checks below ensure that the predicate column was
               // not projected out.
               if (byFunctionColDef_ != null
                     &&
finalOutputColDef.getColumnName().equals(byFunctionColDef_.getColumnName())
   && finalOutputColDef.getColumnType() == byFunctionColDef_.getColumnType()
   && !byFunctionPredicateLT_.evaluate(vh))
               {
                  return null;
               }
               outputEmitter.addValue(vh);
            }
         }
         // compute the input value's md5sum for the output column
         // assuming the column names ends with "_md5sum"
         else
         {
            inputIndex = runtimeSchema_.getRuntimeInputIndex(
            finalOutputColDef.getColumnName().split(md5sumSuffix_)[0]);
            assert (inputIndex >= 0);
            assert (inputIndex < inputIterator.getColumnCount());
            if (inputIterator.isNullAt(inputIndex))
            {
               outputEmitter.addNull();
            }
            else
            {
               String value = inputIterator.getStringAt(inputIndex);
               outputEmitter.addString(calculateMd5sum(value));
            }
         }
      } // for each finalOutputColumn
      return outputEmitter;
   }

   public void operateOnPartition(
      PartitionDefinition partition,
      RowIterator inputIterator,
      RowEmitter outputEmitter
      )
   {
      operateOnSomeRows(inputIterator, outputEmitter);
   }

   public void completePlanningContract(
      PlanningContract planContract
      )
   {
      // read in the incomplete planning contract
      PlanInputInfo planInputInfo = planContract.getPlanInputInfo();
      Distribution inDistribution = null;
      ListColumnDefinition inDistrCols = null;
      DistributionType inDistrType = null;

      // read in distribution
      if (planInputInfo.hasDistribution())
      {
         inDistribution = planInputInfo.getDistribution();
         inDistrCols = inDistribution.getDistributionColumns();
         inDistrType = inDistribution.getDistributionType();
      }

      // read in order
      Order inOrder = null;
      ListOrderDefinition inOrderDefs = null;
      if (planInputInfo.hasOrder())
      {
         inOrder = planInputInfo.getOrder();
         inOrderDefs = inOrder.getOrderDefinitions();
      }

      // read in query context info
      QueryContextInfo qci = null;
      OrderLimit orderLimit = null;
      ListOnOutputPredicate outPreds = null;
      ListOnInputPredicate onInputPredicates = new ArrayListOnInputPredicateOnInputPredicate();

      if (planContract.hasQueryContextInfo())
      {
         qci = planContract.getQueryContextInfo();
         // read in order limit
         if (qci.hasOrderLimit())
         {
            orderLimit = qci.getOrderLimit();
         }

         outPreds = qci.getOnOutputPredicates();
      }

      ///////////////////////////////////////////////////////////////////
      // Input distribution passes through as output distribution,
      // specified in the completedPlanningContract method.
      Distribution outDistribution = null;
      if (!outputColRename_)
      {
         outDistribution = inDistribution;
      }
      else if (planInputInfo.hasDistribution())
      {
         ArrayListColumnDefinition outDistrCols = new ArrayListColumnDefinition();
         for (ColumnDefinition inDistrCol: inDistrCols)
         {
            ////////////////////////////////////////////////////////////////////
            // Rename the output distribution column rename by prepending
            // prefix_ to the input column name.
            // The output columns names in the runtime contract
            // have also been prepended with the prefix_.
            ColumnDefinition outDistrCol = new ColumnDefinition (prefix_
                  + inDistrCol.getColumnName(), inDistrCol.getColumnType());
            outDistrCols.add(outDistrCol);
         }

         ///////////////////////////////////////////////////////////////////////
         // Kill the distribution column, going from 'distributed' to 'any',
         // specified in the completePlanningContract method
         DistributionType outDistrType;
         // change the distribution type
         if (inDistrType == DistributionType.distributed){
            outDistrType = DistributionType.any;
      }
      else {
         outDistrType = inDistrType;
      }

      outDistribution = new Distribution.Builder()
      .setDistributionColumns(outDistrCols)
      .setDistributionType(outDistrType)
      .toDistribution();
   }

   //////////////////////////////////////////////////////////////////////////
   // Passthrough of input order, which is the same as output order.
   // This is specified in the completePlanningContract method.
   Order outOrder = null;
   if (!outputColRename_)
   {
      outOrder = inOrder;
   }
   else if (planInputInfo.hasOrder())
   {
         //////////////////////////////////////////////////////////////////////////
         // Send prefix of the input order.  For instance, if the input order is
         // a,b,c, the output order is
         // a,b.
         int numOutOrderDefs = inOrderDefs.size()-1;
         ArrayListOrderDefinition outOrderDefs = new ArrayListOrderDefinition();

         for (int i=0; i<numOutOrderDefs; i++)
         {
            OrderDefinition inOrderDef = inOrderDefs.get(i);

            // Rename the output order column name by prepending
            // prefix_ to the input column name.
            // The output columns names in the runtime contract
            // have also been prepended with the prefix_.
            OrderDefinition outOrderDef = new OrderDefinition (prefix_
                  + inOrderDef.getColumnName(), inOrderDef.getColumnType(),
                  inOrderDef.isAscending(), inOrderDef.isNullsFirst());
            outOrderDefs.add(outOrderDef);
         }

         outOrder = new Order.Builder()
         .setOrderDefinitions(outOrderDefs)
         .toOrder();
      }
      PlanOutputInfo planOutputInfo = null;
      planOutputInfo = new PlanOutputInfo.Builder()
      .setDistribution(outDistribution)
      .setOrder(outOrder)
      .toPlanOutputInfo();
      planContract.setPlanOutputInfo(planOutputInfo);

      //////////////////////////////////////////////////////////////////////////
      // Conduct output column projection

      boolean shouldProjectOutputColumns = false;
      if (planContract.hasQueryContextInfo()
            && !qci.getOutputColumnsToProject().isEmpty())
      {
         shouldProjectOutputColumns = true;
         finalOutputColumns_ = qci.getOutputColumnsToProject();
      }
      else
      {
         finalOutputColumns_ = runtimeSchema_.getOutputCols();
      }

      ArrayListInputContext inputContexts = new ArrayListInputContext();


      ListByFunctionPredicate byFunctionPredicates = new ArrayListByFunctionPredicate();

      if (planContract.hasQueryContextInfo())
      {
         /////////////////////////////////////////////////////////////////////////
         // Conduct input column projection
         // This shows an example where input columns needed are the output columns
         // projected.
         // This is specified in the completePlanningContract method.

         ListColumnDefinition inputColumnsNeeded = new ArrayListColumnDefinition();

         // add the output columns projected to the input column
         // projection
         ListColumnDefinition outputColumnsToProject =
               qci.getOutputColumnsToProject();

         // The planner expects inputColumnsNeeded only when there are
         // output columns to project
         //
         // Add a new column from the input that is not in the output
         // column projection
         if (!outputColumnsToProject.isEmpty())
         {
            inputColumnsNeeded = getInputColNotInOutputProjection(outputColumnsToProject);
         }
         String inputColNameNeeded=null;
         if (!inputColumnsNeeded.isEmpty())
         {
            inputColNameNeeded = inputColumnsNeeded.get(0).getColumnName();
         }

         HashSetString columnNamesAdded = new HashSetString();
         for (ColumnDefinition colDef: outputColumnsToProject)
         {
            assert (colDef != null);
            // If projection contains column names
            // "id", and "number_md5sum",
            // the columns needed would be "id" and "number"
            //
            String columnName;
            if (colDef.getColumnName().endsWith(md5sumSuffix_))
            {
               columnName = colDef.getColumnName().split(md5sumSuffix_)[0];
            }
            else
            {
               columnName = colDef.getColumnName();
            }
            if (!columnName.equals(inputColNameNeeded)
                  && !columnNamesAdded.contains(columnName))
            {
               Integer inputColIndex = runtimeSchema_.getRuntimeInputIndex(columnName);
               assert inputColIndex != null;
               ColumnDefinition inputColumn = runtimeSchema_.getInputCols().get(inputColIndex);
            inputColumnsNeeded.add(inputColumn);
            columnNamesAdded.add(columnName);
            }
         }

            for (OnOutputPredicate outPred : outPreds)
            {
               boolean pushedToFunction = false;
               ListColumnDefinition predOutCols = outPred.getColumns();

            ////////////////////////////////////////////////////////////////////////
            // The following is an example of predicate push-down to the function,
            // where operator is '<' and the predicate column has
            // data in ascending order
            // (versus a column containing md5sum's).
            // This is specified in the completePlanningContract method
            //
            if (outPred.isSimpleConstraint())
            {
               // a simple constraint can only involve a single column
               ColumnDefinition predOutCol = predOutCols.get(0);

               if (outPred.getOperatorType() == OperatorType.LT
                     &&
                     // isColAscending is a helper function that
                     // determines whether the column is produced
                     // by the function in ascending order.
                     // As all input columns pass through to the
                     // output,
                     // the function looks at the corresponding
                     // OrderDefinition in the Order of the PlanInputInfo
                     // to see if it is ascending.
                     isColAscending (predOutCol, planContract)
                     )
               {
                  ByFunctionPredicate funcPred = outPred.pushToFunction();
                  byFunctionPredicates.add(funcPred);
                  pushedToFunction = true;
               }
            }
            ////////////////////////////////////////////////////////////////////////
            // The following is an example of predicate push-down to input,
            // where input column names are different
            // from output column names.
            // Semantically, the md5sum column doesn't have an input counter-part,
            // so the predicate on that column cannot be pushed down.
            // Thus, some predicates can be pushed down to input, and some cannot
            // depending on which columns are involved in the predicate.
            // This is specified in the completePlanningContract method
            //
            if (!pushedToFunction)
            {
               // The helper function returns a 1-to-1 mapping of the output
               // columns mapped to their input counterparts.
               // If no such mapping can be made (for instance, if the
               // output column corresponds to an md5sum for which there is no
               // corresponding input), the helper function returns null
               ListColumnDefinition mappedCols
               = mapOutputColumnsToInput (predOutCols);

               if (mappedCols != null)
               {
                  OnInputPredicate onInputPredicate = outPred
                        .pushToInput(mappedCols);
                  onInputPredicates.add(onInputPredicate);
               }
            }

         } // for each outPred
         // input name should be obtained from runtime contract, not the
         // optional PlanInputInfo
         String inputName = runtimeContractInputInfo_.getInputName();
         InputContext inputContext = new InputContext.Builder()
         .setInputName(inputName)
         .setInputColumnsNeeded(inputColumnsNeeded)
         .setOnInputPredicates(onInputPredicates)
         .toInputContext();
         inputContexts.add(inputContext);
      } // has query context info

      ////////////////////////////////////////////////////////////////////////
      // The following is an example of determining whether the OrderLimit
      // can be applied.
      // The OrderLimit can be applied by the
      // function if either of the following is true:
      // 1. there are no order by columns in OrderLimit
      // or
      // 2. the query's order by columns are a prefix subsequence of the
      // ordered output columns
      //
      // Example:
      //   In:
      //       select a, b, c from foo() order by c, b limit 100
      //   If foo's output guarantees ordering on c, b,... then we
      //   can apply the limit on a per worker basis.
      //
      // The following code would be appear in the method
      // completePlanningContract.
      //
      boolean willApplyLimit = false;
      if (planContract.hasQueryContextInfo()
            && qci.hasOrderLimit() && planContract.hasPlanOutputInfo()){
         // assume here that the PlanOutputInfo has already be set by the
         // function.
         planOutputInfo = planContract.getPlanOutputInfo();
         assert planOutputInfo != null;

         // It's safe to apply limit
         // if there are no order by
         // columns.
         if (orderLimit.getOrderDefinitions().isEmpty())
         {
            willApplyLimit = true;
         }
         // The helper method areOrderCompatible returns true if
         // the limit's order by columns are a prefix of the
         // ordered output columns, and false otherwise.

         else if (planOutputInfo.hasOrder() &&
               areOrdersCompatible (planOutputInfo.getOrder().getOrderDefinitions(),
                     orderLimit.getOrderDefinitions()))
         {
            willApplyLimit = true;
         }
         else
         {
            willApplyLimit = false;
         }
      }
      QueryContextReply qcr = new QueryContextReply.Builder()
      .setProjectOutputColumns(shouldProjectOutputColumns)
      .setInputContexts(inputContexts)
      .setByFunctionPredicates(byFunctionPredicates)
      .setApplyLimit(willApplyLimit)
      .toQueryContextReply();
      planContract.setQueryContextReply(qcr);

      planContract.complete();
   }

  /* Taking the output columns to project as an argument,
   * this method finds an input column (if any)
   * that does not appear in the output column projection.  It returns this
   * column to be added to the list of input columns needed.
   */
   ListColumnDefinition getInputColNotInOutputProjection (ListColumnDefinition outputColumnsToProject)
   {
      ListColumnDefinition inputColumnsNeeded = new ArrayListColumnDefinition();

      for (ColumnDefinition colDef: runtimeSchema_.getOutputCols())
      {
         if (!colDef.getColumnName().endsWith(md5sumSuffix_))
         {
            // if the corresponding column does not appear in the output,
            // add the column to the inputColumnsNeeded list.
            int outputIndex = ColumnDefinition.indexOfColumn(outputColumnsToProject, colDef.getColumnName());

            // if output column does not appear in the projection
            if (outputIndex == -1)
            {
               Integer inputIndex = runtimeSchema_.getRuntimeInputIndex(colDef.getColumnName());
               assert inputIndex != null;
               ListColumnDefinition inputCols = runtimeSchema_.getInputCols();
               inputColumnsNeeded.add(inputCols.get(inputIndex));
            }
         }
      }
      return inputColumnsNeeded;
   }

   /* Returns true if the order of the OrderLimit is a prefix of the
    * order of the OutputOrder */
   boolean areOrdersCompatible (ListOrderDefinition poDefs,
        ListOrderDefinition olDefs)
   {
      if (poDefs == null)
      {
         return false;
      }
      if (olDefs.size() > poDefs.size())
      {
         return false;
      }

      for (int i=0; i<olDefs.size(); i++)
      {
         OrderDefinition poDef = poDefs.get(i);
         OrderDefinition olDef = olDefs.get(i);

         if (!poDef.getColumnName().equals(olDef.getColumnName())
               || poDef.isAscending() != olDef.isAscending()
               || poDef.isNullsFirst() != olDef.isNullsFirst()
               )
         {
            return false;
         }
      }
      return true;
   }

   /* Returns when the column data is in ascending order */

   boolean isColAscending (ColumnDefinition outCol, PlanningContract planContract)
   {
      if (outCol.getColumnName().endsWith(md5sumSuffix_))
      {
         return false;
      }
      if (planContract.hasPlanInputInfo())
      {
         PlanInputInfo pii = planContract.getPlanInputInfo();
         if (pii.hasOrder())
         {
            Order order = pii.getOrder();
            ListOrderDefinition orderDefs = order.getOrderDefinitions();

            // remove prefix before comparing output column name with input
            // column name
            String outColName = outCol.getColumnName();

            if (outColName.startsWith(prefix_))
            {
               outColName = outColName.split(prefix_)[1];
            }
            for (OrderDefinition orderDef: orderDefs)
            {
               if (orderDef.getColumnName().equals(outColName)
                     &&
                     orderDef.getColumnType() == outCol.getColumnType())
               {
                  return orderDef.isAscending();
               }
            }
         }
      }
      return false;
   }

   // This helper function specifies the 1-to-1 mapping of the output
   // columns mapped to their input counterparts.
   // Columns A and B are pass through and only predicates on these columns
   // can be pushed to the input.
   // Any output column that corresponds to an md5sum is clearly not
   // pass through, and so any predicate that contains such a column cannot
   // be pushed to the input.

   ListColumnDefinition mapOutputColumnsToInput (List<ColumnDefinition> predOutCols)
   {
      ListColumnDefinition inCols = runtimeContractInputInfo_.getColumns();
      ListColumnDefinition outCols = runtimeContractOutputInfo_.getColumns();

      ListColumnDefinition mappedCols = new ArrayListColumnDefinition();

      int numMapped = 0;
      for (ColumnDefinition predOutCol: predOutCols)
      {
         //
         // if any of the output columns correspond to columns containing
         // md5sum's,
         // the function cannot push the predicate to the input
         int outputIndex = ColumnDefinition.indexOfColumn(outCols,
                  predOutCol.getColumnName());
         if (outputIndex != -1 && outputIndex % 2 == 1)
         {
            return null;
         }
         //
         // If any of the output columns cannot be mapped to the input,
         // i.e., the corresponding input column is not passthrough,
         // the function cannot push the predicate to the input.
         //
         // mapOutputColumnToInput is a helper function that
         // determines which input column if any is mappable to the
         // output columns.
         // If the output column cannot be mapped to any
         // of the inputs, the helper function returns null
         //
         Integer inputIndex = mapOutputColumnToInput (predOutCol);
         if (inputIndex == null)
         {
            return null;
         }
         mappedCols.add(inCols.get(inputIndex));
         numMapped++;
      }
      if (numMapped != predOutCols.size())
      {
         return null;
      }
      else
      {
         return mappedCols;
      }
   }

   //
   // Helper method to map the output column to an input column.
   // If the mapping does not exist, the method returns null.
   // The method assumes that the input and output indices corrresponding to the
   // the passthrough columns have been set as instance variables.
   // In particular,
   // column A is at index passThruColA_inputInd_ in the input, and is at
   // index passThruColA_outputInd_ in the output.  The same goes for column B.
   // Typically, these indices can be set in the SQL-analytic method's
   // constructor where the name of the pass-through column could be given as a
   // SQL-analytic function argument, for example, and a lookup of its
   // corresponding index in the schema can be done.
   //

   Integer mapOutputColumnToInput (ColumnDefinition predOutCol)
   {
      ListColumnDefinition outCols = runtimeContractOutputInfo_.getColumns();

      int outputIndex = ColumnDefinition.indexOfColumn(outCols,
               predOutCol.getColumnName());

   // One should ensure a 1-to-1 mapping, possibly by tracking which columns
   // have been mapped
   if (outputIndex == passThruColA_outputInd_)
   {
      return passThruColA_inputInd_;
   }
   else if (outputIndex == passThruColB_outputInd_)
   {
      return passThruColB_inputInd_;
   }
   else
   {
      return null;
   }
}

public void receiveCompletedPlanningContract(
     PlanningContract.CompletedContract planContract
    ){
   System.out.println ("[MD5SUM_CBP] Receiving completed planning contract");
   complPlanContract_ = planContract;
   processCompletedPlanningContract (planContract);
}

private void processCompletedPlanningContract (
                            PlanningContract.CompletedContract planContract)
{
   QueryContextInfo qci;
   QueryContextReply qcr;

   if (complPlanContract_ != null
         && complPlanContract_.hasQueryContextInfo()
         && complPlanContract_.hasQueryContextReply())
   {
      qci = complPlanContract_.getQueryContextInfo();
      qcr = complPlanContract_.getQueryContextReply();
      if (qcr.isProjectOutputColumns())
      {
         // if the function previously agreed to conduct
         // output column projection, get the corresponding projected
         // output schema
         finalOutputColumns_ = qci.getOutputColumnsToProject();
      }

      ListByFunctionPredicate byFunctionPredicates
      = qcr.getByFunctionPredicates();
      if (!byFunctionPredicates.isEmpty()
            && byFunctionPredicates.get(0).getOperatorType() == OperatorType.LT)
      {
         byFunctionPredicateLT_ = byFunctionPredicates.get(0);
         ListColumnDefinition byFunctionColDefs = null;
         byFunctionColDefs = byFunctionPredicateLT_.getColumns();
         byFunctionColDef_ = byFunctionColDefs.get(0);
      }

      if (qci.hasOrderLimit() && qcr.isApplyLimit())
      {
         OrderLimit orderLimit = qci.getOrderLimit();
         limit_ = orderLimit.getLimit();
      }
   }

   if (finalOutputColumns_ == null)
   {
      // if there is no output column projection to be done,
      // get the orginal output schema from the runtime contract.
      finalOutputColumns_ = runtimeContractOutputInfo_.getColumns();
   }
   rowSqlTypes_ = ColumnDefinition.typesFromColumns (finalOutputColumns_);
}

static public class RuntimeSchema
{
   private HashMapString, Integer inputColumnNameToIndexMap;
   private HashMapString, Integer outputColumnNameToIndexMap;
   private ListColumnDefinition inputCols;
   private ListColumnDefinition outputCols;
   private final String prefix_;

   public RuntimeSchema (ListColumnDefinition runtimeInputColumns,
                      ListColumnDefinition runtimeOutputColumns, String prefix)
   {
      inputCols = runtimeInputColumns;
      outputCols = runtimeOutputColumns;

      inputColumnNameToIndexMap = new HashMapString, Integer();
      outputColumnNameToIndexMap = new HashMapString, Integer();

      for (int i=0; i<runtimeInputColumns.size(); i++)
      {
         inputColumnNameToIndexMap.put(
         runtimeInputColumns.get(i).getColumnName(), i);
      }

      for (int i=0; i<runtimeOutputColumns.size(); i++)
      {
         outputColumnNameToIndexMap.put(
         runtimeOutputColumns.get(i).getColumnName(), i);
      }
            this.prefix_ = prefix;
   }

      public ListColumnDefinition getInputCols()
      {
         return inputCols;
      }

      public ListColumnDefinition getOutputCols()
      {
         return outputCols;
      }

      public Integer getRuntimeInputIndex (String columnName)
      {
         // remove any output column prefix
         // when looking for corresponding column in the input
         if (columnName.length() != 0
               && columnName.startsWith(prefix_))
         {
            columnName = columnName.split(prefix_)[1];
         }
         return inputColumnNameToIndexMap.get(columnName);
      }

      public Integer getRuntimeOutputIndex (String columnName)
      {
         return outputColumnNameToIndexMap.get(columnName);
      }
   }
}