import java.util.ArrayList; import com.asterdata.ncluster.aggregator.DecomposableAggregatorFunction; import com.asterdata.ncluster.aggregator.DecomposableAggregatorRuntimeContract; import com.asterdata.ncluster.sqlmr.OutputInfo; import com.asterdata.ncluster.sqlmr.data.ColumnDefinition; import com.asterdata.ncluster.sqlmr.data.SqlType; import com.asterdata.ncluster.sqlmr.data.ValueHolder; import com.asterdata.ncluster.util.ImmutableList; import com.asterdata.ncluster.sqlmr.data.RowHolder; import com.asterdata.ncluster.sqlmr.data.RowView; /** * An aggregator that computes a sum of integer values. * * Both the final aggregated value and the partially aggregated value are of * bigint type. * * The sum of no rows is 0, not NULL as it would be in the standard SQL sum. */ public class int_sum implements DecomposableAggregatorFunction { /** Partial sum */ private RowHolder sum_; /** * Complete the aggregator contract returning the row schema (int) partial * schema (bigint), and final schema (bigint). This aggregator takes no * arguments. */ public int_sum(DecomposableAggregatorRuntimeContract contract) { // Allocate a row to hold the aggregated value and initialize it. this.sum_ = new RowHolder(SqlType.bigint()); this.sum_.setLongAt(0, 0); // Partial schema ArrayListSqlType partialSchema = new ArrayListSqlType(); partialSchema.add(SqlType.bigint()); contract.setPartialResultSchema(ImmutableList.elementsOf(partialSchema)); // Final (output) type ArrayListColumnDefinition outputColumns = new ArrayListColumnDefinition(); outputColumns.add(new ColumnDefinition("result", SqlType.bigint())); contract.setOutputInfo(new OutputInfo(outputColumns)); contract.complete(); } /** * Aggregate the first attribute (integer) of the input row. */ public void aggregateRow(RowView row) { // skip NULLs if (row.isNullAt(0)) return; this.sum_.setLongAt(0, this.sum_.getLongAt(0) + row.getIntAt(0)); } /** * Aggregate the first attribute (integer) of the input row. */ public void aggregatePartialRow(RowView partialRow) { this.sum_.setLongAt(0, this.sum_.getLongAt(0) + partialRow.getIntAt(0)); } /** * Return sum as final value. */ public ValueHolder getFinalValue() { ValueHolder retVal = new ValueHolder(this.sum_.getColumnTypes().get(0)); this.sum_.getValueAt(0, retVal); return retVal; } /** * Return sum as partial aggregate. */ public RowView getPartialRow() { return this.sum_.clone(); } /** * Reset the aggregate. */ public void reset() { this.sum_.setLongAt(0, 0); } }