博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
《Spark机器学习》笔记——Spark Streaming 在实时机器学习中的应用
阅读量:2500 次
发布时间:2019-05-11

本文共 7736 字,大约阅读时间需要 25 分钟。

此前我们一直讨论的是批量数据处理,也就是我们所有的分析、特征提取和模型训练都被应用于一组固定不变的数据。这很好地适用于Spark对RDD的核心抽象,即不可变的分布式数据集。尽管可以使用Spark的转换函数和行动算子从原始的RDD创建新RDD,但是RDD一旦创建,其中包含的数据就不会改变。

我们的注意力一直集中于批量机器学习模型,训练模型的固定训练集通常表示为一个特征向量(在监督学习模型的例子中是标签)的RDD。

在本章,我们将

介绍在线学习的概念,当新的数据出现时,模型将被训练和更新。

学习使用Spark Streaming 做流处理

如何将Spark Streaming 应用于在线学习

10.1、在线学习

相比于离线计算,在线学习是以对训练数据通过完全增量的形式顺序处理一遍为基础(就是说,一次只训练一个样例)。当处理完每一个样本,模型会对测试样例做预测并得到正确的输出(例如得到分类的标签或者回归的真实目标)。在线学习背后的想法就是模型随着接收到新的消息不断更新自己,而不是像离线训练一次次重新训练。

在某种配置下,当数据量很大的时候,或者生成数据的过程快速变化的时候,在线学习方法可以快速接近实时的响应,而不需要离线学习中昂贵的重新训练。

Spark Streaming例子1

消息生成端

package cn.edu.shu.ces.chenjie.streamingappimport java.io.PrintWriterimport java.net.ServerSocketimport scala.util.Random/***  * 随机生成“产品活动”的消息生成端  * 每秒最多5个,然后通过网络连接发送  */object StreamingProducer {  def main(args: Array[String]): Unit = {    val random  = new Random()    val MaxEvents = 6;//每秒最大活动数    val nameResource = this.getClass.getResourceAsStream("names.csv")    val names = scala.io.Source.fromInputStream(nameResource)      .getLines()      .toList      .head      .split(",")      .toSeq    println(names)    val products = Seq(      "iPhone Cover" -> 9.99,      "Headphones" -> 5.49,      "Samsung Galaxy Cover" -> 8.95,      "iPad Cover" -> 7.49    )    def generateProductEvents(n: Int) = {      (1 to n).map{ i =>        val (product, price) = products(random.nextInt(products.size))        val user = random.shuffle(names).head        (user, product, price)      }    }    val listener = new ServerSocket(9999)    println("Listening on port : 9999")    while (true){      val socket = listener.accept()      new Thread(){        override def run() = {          println("Got client connected from : " + socket.getInetAddress)          val out = new PrintWriter(socket.getOutputStream, true)          while (true){            Thread.sleep(1000)            val num = random.nextInt(MaxEvents)            val productEvents = generateProductEvents(num)            productEvents.foreach{  event =>              out.write(event.productIterator.mkString(","))              out.write("\n")            }            out.flush()            println(s"Created $num events")          }          socket.close()        }      }.start()    }  }}
一个简单的流处理

package cn.edu.shu.ces.chenjie.streamingappimport org.apache.spark.streaming.{Seconds, StreamingContext}object SimpleStreamingApp {  def main(args: Array[String]): Unit = {    val ssc = new StreamingContext("local[2]","First Streaming App", Seconds(10))    val stream = ssc.socketTextStream("localhost", 9999)    stream.print()    ssc.start()    ssc.awaitTermination()  }}
流式分析

package cn.edu.shu.ces.chenjie.streamingappimport java.text.SimpleDateFormatimport java.util.Dateimport org.apache.spark.streaming.{Seconds, StreamingContext}object StreamingAnalyticsApp {  def main(args: Array[String]): Unit = {    val ssc = new StreamingContext("local[2]","First Streaming App", Seconds(10))    val stream = ssc.socketTextStream("localhost", 9999)    val events = stream.map{  record =>      val event = record.split(",")      (event(0),event(1),event(2))    }    events.foreachRDD{(rdd, time) =>      val numPurchases = rdd.count()      val uniqueUsers = rdd.map{  case (user, _, _) => user}.distinct().count()      val totalRevenue = rdd.map{ case (_, _, price) => price.toDouble}.sum()      val productsByPopularity = rdd.map{ case (user, product, price) =>        (product, 1)      }.reduceByKey(_ + _).collect().sortBy(_._2)      val mostPopular = productsByPopularity(0)      val formmater = new SimpleDateFormat()      val dateStr = formmater.format(new Date(time.milliseconds))      println(s"== Batch start time: $dateStr ==")      println("Total purchases: " + numPurchases)      println("Unique users: " + uniqueUsers)      println("Total revenue: " + totalRevenue)      println("Most popular product: %s with %d purchases".format(mostPopular._1, mostPopular._2))    }    ssc.start()    ssc.awaitTermination()  }}
有状态的流计算

package cn.edu.shu.ces.chenjie.streamingappimport org.apache.spark.streaming.{Seconds, StreamingContext}object StreamingStateApp {  import  org.apache.spark.streaming.StreamingContext._  def updateState(prices: Seq[(String, Double)], currentTotal: Option[(Int, Double)]) = {    val currentRevenue = prices.map(_._2).sum    val currentNumberPurchases = prices.size    val state = currentTotal.getOrElse((0, 0.0))    Some((currentNumberPurchases + state._1, currentRevenue + state._2))  }  def main(args: Array[String]): Unit = {    val ssc = new StreamingContext("local[2]","First Streaming App", Seconds(10))    ssc.checkpoint("/tmp/sparkstreaming/")    //对于有状态的操作,需要设置一个检查点    val stream = ssc.socketTextStream("localhost", 9999)    val events = stream.map{  record =>      val event = record.split(",")      (event(0),event(1),event(2).toDouble)    }    val users = events.map{ case (user, product, price) => (user, (product, price))}    val revenuePerUser = users.updateStateByKey(updateState)    revenuePerUser.print()    ssc.start()    ssc.awaitTermination()  }}
使用Spark Streaming进行在线学习

流回归

创建流数据生成器

package cn.edu.shu.ces.chenjie.streamingappimport java.io.PrintWriterimport java.net.ServerSocketimport java.util.Randomimport breeze.linalg.DenseVector/***  * 随机线性回归数据的生成器  */object StreamingModelProducer {  def main(args: Array[String]): Unit = {    val MaxEvents = 10//每秒处理活动的最大数目    val NumFeatures = 100    val random = new Random()    def generateRandomArray(n: Int) = Array.tabulate(n)(n => random.nextGaussian())    //生成服从正态分布的稠密向量的函数    val w = new DenseVector(generateRandomArray(NumFeatures))    val intercept = random.nextGaussian() * 10    val listener = new ServerSocket(9999)    println("Listening on port : 9999")    while(true){      val socket = listener.accept()      new Thread(){        override def run() = {          println("Got client connected from :" + socket.getInetAddress)          val out = new PrintWriter(socket.getOutputStream, true)          while(true){            Thread.sleep(1000)            val num = random.nextInt(MaxEvents)            val data = generateNoisyData(num)            data.foreach{ case (y, x) =>              val xStr = x.data.mkString(",")              val eventStr = s"$y\t$xStr"              out.write(eventStr)              out.write("\n")            }            out.flush()            println(s"Created $num events")          }          socket.close()        }      }.start()    }    /**      * 生成一些随机数据事件      * @param n      * @return      */    def generateNoisyData(n: Int) = {      (1 to n).map{ i =>        val x = new DenseVector(generateRandomArray(NumFeatures))        val y:Double = w.dot(x)        val nosiy = y + intercept        (nosiy, x)      }    }  }}
创建流回归模型

package cn.edu.shu.ces.chenjie.streamingappimport breeze.linalg._import org.apache.spark.mllib.linalg.Vectorsimport org.apache.spark.mllib.regression.{LabeledPoint, StreamingLinearRegressionWithSGD}import org.apache.spark.streaming.{Seconds, StreamingContext}object SimpleStreamingModel {  def main(args: Array[String]): Unit = {    val ssc = new StreamingContext("local[2]", "First Streaming App", Seconds(10))    val stream = ssc.socketTextStream("localhost", 9999)    val NumFeatures = 10    val zeroVector = DenseVector.zeros[Double](NumFeatures)    val model = new StreamingLinearRegressionWithSGD()      .setInitialWeights(Vectors.dense(zeroVector.data))      .setNumIterations(1)      .setStepSize(0.01)    val labeledStream = stream.map{ event =>      val split = event.split("\t")      val y = split(0).toDouble      val features = split(1).split(",").map(_.toDouble)      LabeledPoint(label = y, features = Vectors.dense(features))    }    model.trainOn(labeledStream)    model.predictOn(labeledStream)    ssc.start()    ssc.awaitTermination()  }}

转载地址:http://suqrb.baihongyu.com/

你可能感兴趣的文章
markdown学习/mou
查看>>
CentOS 搭建 LAMP服务器
查看>>
很多人都不知道,其实博客园给我们博客开了二级域名
查看>>
tiny4412 linux+qtopia nfs网络文件系统的挂载
查看>>
Web UI 自动化测试环境搭建 (转载自51测试天地第三十九期上)
查看>>
在Bootstrap开发框架中使用bootstrap-datepicker插件
查看>>
String类中IndexOf与SubString
查看>>
记录下Linux难记实用的命令
查看>>
react 路由 react-router-dom
查看>>
Java工具类——通过配置XML验证Map
查看>>
Leetcode::Subsets
查看>>
JAVA 重写&重载/多态/抽象类/封装/接口/包
查看>>
关于js的function.来自百度知道的回答,学习了.
查看>>
学习正则表达式
查看>>
linux高级IO
查看>>
angualarjsdemo
查看>>
【C#】解析C#中JSON.NET的使用
查看>>
PyQt中从RAM新建QIcon对象 / Create a QIcon from binary data
查看>>
HTML5拖放API
查看>>
【Django】Django web项目部署(Nginx+uwsgi)
查看>>