void udaggregation ()
{
int null_ind, length;
FNC_TblOpHandle_t *Input_handle; // input stream handle
FNC_TblOpHandle_t *Output_handle; // output stream handle
FNC_TblOpColumnDef_t *Input_columns; // input stream column definitions
FNC_TblOpColumnDef_t *Output_columns; // output stream column definitions
double *aggregates; // aggregates computation
int *index; // columns index in input stream of aggregates
int rowcount = 0 ; // number of rows
int i, j, k, tmp;
BYTE *ptr;
int colcount;
/* Allocate memory for the output columns */
colcount = FNC_TblOpGetColCount(0, 'W');
Output_columns = FNC_malloc ( TblOpSIZECOLDEF( colcount ) );
/* initialize output columns */
TblOpINITCOLDEF(Output_columns, colcount);
FNC_TblOpGetColDef(0, 'W', Output_columns);
/* Allocate memory for input columns */
Input_columns = FNC_malloc ( TblOpSIZECOLDEF( FNC_TblOpGetColCount(0, 'R') ) );
/* initialize input columns */
TblOpINITCOLDEF( Input_columns, FNC_TblOpGetColCount(0, 'R') );
FNC_TblOpGetColDef(0, 'R', Input_columns);
/* initialize aggregated values for group */
aggregates = FNC_malloc( sizeof(float) * Output_columns->num_columns );
for (i=0; i<Output_columns->num_columns; i++)
{
aggregates[i] = 0;
}
/* get indices from contract context */
index = FNC_malloc( FNC_TblOpGetContractLength() );
FNC_TblOpGetContractDef(index, FNC_TblOpGetContractLength(), &tmp );
/* The basic row iterator would be structured as follows */
Input_handle = FNC_TblOpOpen(0, 'R', TBLOP_NOOPTIONS); // start iterator for input stream
Output_handle = FNC_TblOpOpen(0, 'W', TBLOP_NOOPTIONS); // start iterator for output stream
while ( FNC_TblOpRead(Input_handle) == TBLOP_SUCCESS)
{
rowcount++;
// update aggregate for each column
for (i=0; i<Output_columns->num_columns; i++)
{
/* increment aggregated values */
FNC_TblOpGetAttributeByNdx(Input_handle, index[i], (void **) &ptr, &null_ind, &length);
switch (Input_columns->column_types[index[i]].datatype)
{
case BYTEINT_DT: aggregates[i] += *((signed char *) ptr); break;
case SMALLINT_DT: aggregates[i] += *((short *) ptr); break;
case INTEGER_DT: aggregates[i] += *((int *) ptr); break;
case BIGINT_DT: aggregates[i] += *((long long *) ptr); break;
}
}
}
/* set output values */
for (i=0; i<Output_columns->num_columns; i++)
{
if (Output_columns->column_types[i].column[0] == 'A')
{
if (rowcount != 0)
{
aggregates[i] = aggregates[i] / ((double) rowcount);
}
}
FNC_TblOpBindAttributeByNdx(Output_handle, i, aggregates+i, 0, sizeof(double));
}
/* write output row */
FNC_TblOpWrite(Output_handle);
FNC_TblOpClose(Input_handle);
FNC_TblOpClose(Output_handle);
// release memory
FNC_free(Output_columns);
FNC_free(Input_columns);
FNC_free(aggregates);
FNC_free(index);
}