BT

如何利用碎片时间提升技术认知与能力? 点击获取答案

使用Spark Streaming进行情感分析

| 作者 侠天 关注 5 他的粉丝 发布于 2016年5月25日. 估计阅读时间: 10 分钟 | QCon上海2018 关注大数据平台技术选型、搭建、系统迁移和优化的经验。

这里将使用Twitter流式数据,它符合所有所需:持续而且无止境的数据源。

Spark Streaming

Spark Streaming在电子书《手把手教你学习Spark》第六章有详细介绍,这里略过Streaming API的详细介绍,直接进行程序开发 。

程序开发设置部分

程序开发起始部分需要做好准备工作。

val config = new SparkConf().setAppName("twitter-stream-sentiment")
val sc = new SparkContext(config)
sc.setLogLevel("WARN")

val ssc = new StreamingContext(sc, Seconds(5))

System.setProperty("twitter4j.oauth.consumerKey", "consumerKey")
System.setProperty("twitter4j.oauth.consumerSecret", "consumerSecret")
System.setProperty("twitter4j.oauth.accessToken", accessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", "accessTokenSecret")

val stream = TwitterUtils.createStream(ssc, None)

这里创建一个Spark Context sc,设置日志级别为WARN来消除Spark生成的日志。使用sc创建Streaming Contextssc,然后设置 Twitter证书来获得 Twitter网站数据。

Twitter上现在的趋势是什么?

很容易的能够找到任意给定时刻的Twitter趋势,仅仅需要计算数据流每个标签的数目。让我们看下Spark如何实现这个操作的。

val tags = stream.flatMap { status =>
   status.getHashtagEntities.map(_.getText)
}
tags.countByValue()
   .foreachRDD { rdd =>
       val now = org.joda.time.DateTime.now()
       rdd
         .sortBy(_._2)
         .map(x => (x, now))
         .saveAsTextFile(s"~/twitter/$now")
     }

首先从Tweets获取标记,并计算标记的数量,按数量排序,然后持久化结果。我们基于前面的结果建立一个监控面板来跟踪趋势标签。作者的同事就可以创建一个广告标记(campaigns),并吸引更多的用户。

分析Tweets

现在我们想增加一个功能来获得用户主要感兴趣的主题集。为了这个目的我们想对Tweets的大数据和食物两个不相关的主题进行情感分析。

有几种API可以在Tweets上做情感分析,但是作者选择斯坦福自然语言处理组开发的库来抽取相关情感。
build.sbt文件中增加相对应的依赖。

libraryDependencies += "edu.stanford.nlp" % "stanford-corenlp" % "3.5.1"
libraryDependencies += "edu.stanford.nlp" % "stanford-corenlp" % "3.5.1" classifier "models"

现在,我们通过Streaming过滤一定的哈希标签,只选择感兴趣的Tweets,如下所示:

val tweets = stream.filter {t =>
     val tags = t.getText.split(" ").filter(_.startsWith("#")).map(_.toLowerCase)
     tags.contains("#bigdata") && tags.contains("#food")
   }

得到Tweets上所有标签,然后标记出#bigdata和 #food两个标签。
接下来定一个函数从Tweets抽取相关的情感:

def detectSentiment(message: String): SENTIMENT_TYPE

然后对detectSentiment进行测试以确保其可以工作:

it("should detect not understood sentiment") {
     detectSentiment("") should equal (NOT_UNDERSTOOD)
}

it("should detect a negative sentiment") {
     detectSentiment("I am feeling very sad and frustrated.") should equal (NEGATIVE)
}

it("should detect a neutral sentiment") {
     detectSentiment("I'm watching a movie") should equal (NEUTRAL)
}

it("should detect a positive sentiment") {
     detectSentiment("It was a nice experience.") should equal (POSITIVE)
}

it("should detect a very positive sentiment") {
     detectSentiment("It was a very nice experience.") should equal (VERY_POSITIVE)
}

完整列子如下:

val data = tweets.map { status =>
   val sentiment = SentimentAnalysisUtils.detectSentiment(status.getText)
   val tags = status.getHashtagEntities.map(_.getText.toLowerCase)

   (status.getText, sentiment.toString, tags)
}

data中包含相关的情感。

和SQL协同进行分析

现在作者想把情感分析的数据存储在外部数据库,为了后续可以使用SQL查询。
具体操作如下:

val sqlContext = new SQLContext(sc)

import sqlContext.implicits._

data.foreachRDD { rdd =>
   rdd.toDF().registerTempTable("sentiments")
}

将Dstream转换成DataFrame,然后注册成一个临时表,其他喜欢使用SQL的同事就可以使用不同的数据源啦。

sentiment表可以被任意查询,也可以使用Spark SQL和其他数据源(比如,Cassandra数据等)进行交叉查询。
查询DataFrame的列子:

sqlContext.sql("select * from sentiments").show()

窗口操作

Spark Streaming的窗口操作可以进行回溯数据,这在其他流式引擎中并没有。
为了使用窗口函数,你需要checkpoint流数据,具体详情见http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing
简单的一个窗口操作:

tags
   .window(Minutes(1))
   . (...)

结论

此列子虽然简单,但是其可以使用Spark解决实际问题。我们可以计算Twitter上主题趋势。

评价本文

专业度
风格

您好,朋友!

您需要 注册一个InfoQ账号 或者 才能进行评论。在您完成注册后还需要进行一些设置。

获得来自InfoQ的更多体验。

告诉我们您的想法

允许的HTML标签: a,b,br,blockquote,i,li,pre,u,ul,p

当有人回复此评论时请E-mail通知我
社区评论

允许的HTML标签: a,b,br,blockquote,i,li,pre,u,ul,p

当有人回复此评论时请E-mail通知我

允许的HTML标签: a,b,br,blockquote,i,li,pre,u,ul,p

当有人回复此评论时请E-mail通知我

讨论

登陆InfoQ,与你最关心的话题互动。


找回密码....

Follow

关注你最喜爱的话题和作者

快速浏览网站内你所感兴趣话题的精选内容。

Like

内容自由定制

选择想要阅读的主题和喜爱的作者定制自己的新闻源。

Notifications

获取更新

设置通知机制以获取内容更新对您而言是否重要

BT