Example 4: Kmeans2 - Aster Analytics

Teradata AsterĀ® Spark Connector User Guide

Product
Aster Analytics
Release Number
7.00.00.01
Published
May 2017
Language
English (United States)
Last Update
2018-04-13
dita:mapPath
dbt1482959363906.ditamap
dita:ditavalPath
Generic_no_ie_no_tempfilter.ditaval
dita:id
dbt1482959363906
lifecycle
previous
Product Category
Software
/** The Kmeans2 Class defines a new wrapper function that uses the Aster-Spark
 * APIand implements the Spark MLlib K-means clustering algorithm.
 * The Kmeans2 class overrides the run method and implements the Spark K-means
 * MLlib function.
 *
 */
class Kmeans2 (args : Array[String], name: String, mstr:String = null )
  extends AsterSparkFunctionDR (args, name, mstr) {
   /**
   * The run method is overridden to implement the Spark MLlib K-means function.
   * @param input The input RDD that reads data from the source. The input
   * RDD is of type RDD[DataRow].
   * @param sparkFuncParams String representing the parameters specific to the
   * function we are the user is implementing. In this example sparkFuncParams
   * is a numeric values representing the sampling ratio.
   * @return The result RDD is an RDD containing all the input data and the
   * predicted value which in this case the cluster number.
   */
  override def run(input: RDD[DataRow], sparkFunctParams: String ):
  RDD[DataRow] = {
    //sample the input data to build the model
    val sampleFraction = sparkFunctParams.trim().toDouble
    val sampleSet = input.sample(false, sampleFraction, 10)
    val vrdd = sampleSet.map {
    rowArray => getDenseVector(rowArray.tail)}
    val model = new KMeans().run(vrdd)
    //model is built on the sampled data. now predict using the input data set
    val srdd = input.map(
        rowArray => rowArray :+
          model.predict(getDenseVector(
           rowArray.tail)).toDataField)
    srdd
  }
}
Query using the RunOnSpark function:
SELECT cluster, count(*), min(v1), max(v1), min(v2), max(v2)
FROM RunOnSpark (
  ON (SELECT i , v1 , v2 FROM points)
  SPARKCODE ('com.teradata.aster.functions.Kmeans2 0.5')
  OUTPUTS ('key int', 'v1 double precision', 'v2 double precision', 'cluster int')
  SPARK_CLUSTER_ID ('spark1-yarn-socket')
  ) sp
  GROUP BY cluster
  ORDER BY cluster;