本文共 7736 字,大约阅读时间需要 25 分钟。
此前我们一直讨论的是批量数据处理,也就是我们所有的分析、特征提取和模型训练都被应用于一组固定不变的数据。这很好地适用于Spark对RDD的核心抽象,即不可变的分布式数据集。尽管可以使用Spark的转换函数和行动算子从原始的RDD创建新RDD,但是RDD一旦创建,其中包含的数据就不会改变。
我们的注意力一直集中于批量机器学习模型,训练模型的固定训练集通常表示为一个特征向量(在监督学习模型的例子中是标签)的RDD。
在本章,我们将
介绍在线学习的概念,当新的数据出现时,模型将被训练和更新。
学习使用Spark Streaming 做流处理
如何将Spark Streaming 应用于在线学习
10.1、在线学习
相比于离线计算,在线学习是以对训练数据通过完全增量的形式顺序处理一遍为基础(就是说,一次只训练一个样例)。当处理完每一个样本,模型会对测试样例做预测并得到正确的输出(例如得到分类的标签或者回归的真实目标)。在线学习背后的想法就是模型随着接收到新的消息不断更新自己,而不是像离线训练一次次重新训练。
在某种配置下,当数据量很大的时候,或者生成数据的过程快速变化的时候,在线学习方法可以快速接近实时的响应,而不需要离线学习中昂贵的重新训练。
Spark Streaming例子1
消息生成端
package cn.edu.shu.ces.chenjie.streamingappimport java.io.PrintWriterimport java.net.ServerSocketimport scala.util.Random/*** * 随机生成“产品活动”的消息生成端 * 每秒最多5个,然后通过网络连接发送 */object StreamingProducer { def main(args: Array[String]): Unit = { val random = new Random() val MaxEvents = 6;//每秒最大活动数 val nameResource = this.getClass.getResourceAsStream("names.csv") val names = scala.io.Source.fromInputStream(nameResource) .getLines() .toList .head .split(",") .toSeq println(names) val products = Seq( "iPhone Cover" -> 9.99, "Headphones" -> 5.49, "Samsung Galaxy Cover" -> 8.95, "iPad Cover" -> 7.49 ) def generateProductEvents(n: Int) = { (1 to n).map{ i => val (product, price) = products(random.nextInt(products.size)) val user = random.shuffle(names).head (user, product, price) } } val listener = new ServerSocket(9999) println("Listening on port : 9999") while (true){ val socket = listener.accept() new Thread(){ override def run() = { println("Got client connected from : " + socket.getInetAddress) val out = new PrintWriter(socket.getOutputStream, true) while (true){ Thread.sleep(1000) val num = random.nextInt(MaxEvents) val productEvents = generateProductEvents(num) productEvents.foreach{ event => out.write(event.productIterator.mkString(",")) out.write("\n") } out.flush() println(s"Created $num events") } socket.close() } }.start() } }}一个简单的流处理
package cn.edu.shu.ces.chenjie.streamingappimport org.apache.spark.streaming.{Seconds, StreamingContext}object SimpleStreamingApp { def main(args: Array[String]): Unit = { val ssc = new StreamingContext("local[2]","First Streaming App", Seconds(10)) val stream = ssc.socketTextStream("localhost", 9999) stream.print() ssc.start() ssc.awaitTermination() }}流式分析
package cn.edu.shu.ces.chenjie.streamingappimport java.text.SimpleDateFormatimport java.util.Dateimport org.apache.spark.streaming.{Seconds, StreamingContext}object StreamingAnalyticsApp { def main(args: Array[String]): Unit = { val ssc = new StreamingContext("local[2]","First Streaming App", Seconds(10)) val stream = ssc.socketTextStream("localhost", 9999) val events = stream.map{ record => val event = record.split(",") (event(0),event(1),event(2)) } events.foreachRDD{(rdd, time) => val numPurchases = rdd.count() val uniqueUsers = rdd.map{ case (user, _, _) => user}.distinct().count() val totalRevenue = rdd.map{ case (_, _, price) => price.toDouble}.sum() val productsByPopularity = rdd.map{ case (user, product, price) => (product, 1) }.reduceByKey(_ + _).collect().sortBy(_._2) val mostPopular = productsByPopularity(0) val formmater = new SimpleDateFormat() val dateStr = formmater.format(new Date(time.milliseconds)) println(s"== Batch start time: $dateStr ==") println("Total purchases: " + numPurchases) println("Unique users: " + uniqueUsers) println("Total revenue: " + totalRevenue) println("Most popular product: %s with %d purchases".format(mostPopular._1, mostPopular._2)) } ssc.start() ssc.awaitTermination() }}有状态的流计算
package cn.edu.shu.ces.chenjie.streamingappimport org.apache.spark.streaming.{Seconds, StreamingContext}object StreamingStateApp { import org.apache.spark.streaming.StreamingContext._ def updateState(prices: Seq[(String, Double)], currentTotal: Option[(Int, Double)]) = { val currentRevenue = prices.map(_._2).sum val currentNumberPurchases = prices.size val state = currentTotal.getOrElse((0, 0.0)) Some((currentNumberPurchases + state._1, currentRevenue + state._2)) } def main(args: Array[String]): Unit = { val ssc = new StreamingContext("local[2]","First Streaming App", Seconds(10)) ssc.checkpoint("/tmp/sparkstreaming/") //对于有状态的操作,需要设置一个检查点 val stream = ssc.socketTextStream("localhost", 9999) val events = stream.map{ record => val event = record.split(",") (event(0),event(1),event(2).toDouble) } val users = events.map{ case (user, product, price) => (user, (product, price))} val revenuePerUser = users.updateStateByKey(updateState) revenuePerUser.print() ssc.start() ssc.awaitTermination() }}使用Spark Streaming进行在线学习
流回归
创建流数据生成器
package cn.edu.shu.ces.chenjie.streamingappimport java.io.PrintWriterimport java.net.ServerSocketimport java.util.Randomimport breeze.linalg.DenseVector/*** * 随机线性回归数据的生成器 */object StreamingModelProducer { def main(args: Array[String]): Unit = { val MaxEvents = 10//每秒处理活动的最大数目 val NumFeatures = 100 val random = new Random() def generateRandomArray(n: Int) = Array.tabulate(n)(n => random.nextGaussian()) //生成服从正态分布的稠密向量的函数 val w = new DenseVector(generateRandomArray(NumFeatures)) val intercept = random.nextGaussian() * 10 val listener = new ServerSocket(9999) println("Listening on port : 9999") while(true){ val socket = listener.accept() new Thread(){ override def run() = { println("Got client connected from :" + socket.getInetAddress) val out = new PrintWriter(socket.getOutputStream, true) while(true){ Thread.sleep(1000) val num = random.nextInt(MaxEvents) val data = generateNoisyData(num) data.foreach{ case (y, x) => val xStr = x.data.mkString(",") val eventStr = s"$y\t$xStr" out.write(eventStr) out.write("\n") } out.flush() println(s"Created $num events") } socket.close() } }.start() } /** * 生成一些随机数据事件 * @param n * @return */ def generateNoisyData(n: Int) = { (1 to n).map{ i => val x = new DenseVector(generateRandomArray(NumFeatures)) val y:Double = w.dot(x) val nosiy = y + intercept (nosiy, x) } } }}创建流回归模型
package cn.edu.shu.ces.chenjie.streamingappimport breeze.linalg._import org.apache.spark.mllib.linalg.Vectorsimport org.apache.spark.mllib.regression.{LabeledPoint, StreamingLinearRegressionWithSGD}import org.apache.spark.streaming.{Seconds, StreamingContext}object SimpleStreamingModel { def main(args: Array[String]): Unit = { val ssc = new StreamingContext("local[2]", "First Streaming App", Seconds(10)) val stream = ssc.socketTextStream("localhost", 9999) val NumFeatures = 10 val zeroVector = DenseVector.zeros[Double](NumFeatures) val model = new StreamingLinearRegressionWithSGD() .setInitialWeights(Vectors.dense(zeroVector.data)) .setNumIterations(1) .setStepSize(0.01) val labeledStream = stream.map{ event => val split = event.split("\t") val y = split(0).toDouble val features = split(1).split(",").map(_.toDouble) LabeledPoint(label = y, features = Vectors.dense(features)) } model.trainOn(labeledStream) model.predictOn(labeledStream) ssc.start() ssc.awaitTermination() }}
转载地址:http://suqrb.baihongyu.com/