创建项目并初始化业务数据——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(六)

系列文章目录 初识推荐系统——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(一)利用用户行为数据——基于Spark平台的协同过滤实时电影推荐系统项

系列文章目录

  1. 初识推荐系统——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(一)
  2. 利用用户行为数据——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(二)
  3. 项目主要效果展示——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(三)
  4. 项目体系架构设计——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(四)
  5. 基础环境搭建——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(五)
  6. 创建项目并初始化业务数据——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(六)
  7. 离线推荐服务建设——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(七)
  8. 实时推荐服务建设——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(八)
  9. 综合业务服务与用户可视化建设——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(九)
  10. 程序部署与运行——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(十)

项目资源下载

  1. 电影推荐系统网站项目源码Github地址(可Fork可Clone)
  2. 电影推荐系统网站项目源码Gitee地址(可Fork可Clone)
  3. 电影推荐系统网站项目源码压缩包下载(直接使用)
  4. 电影推荐系统网站项目源码所需全部工具合集打包下载(spark、kafka、flume、tomcat、azkaban、elasticsearch、zookeeper)
  5. 电影推荐系统网站项目源数据(可直接使用)
  6. 电影推荐系统网站项目个人原创论文
  7. 电影推荐系统网站项目前端代码
  8. 电影推荐系统网站项目前端css代码

文章目录


前言

  今天给大家带来的博文是关于代码项目的初始化以及整个项目所需数据的初始化,其中包括,在 I D E A IDEA IDEA中创建 m a v e n maven maven项目、数据加载准备、数据初始化到 M o n g o D B MongoDB MongoDB、数据初始化到 E l a s t i c S e a r c h ElasticSearch ElasticSearch等内容,通过这篇博文我们就可以把整个项目的框架搭建起来了。另外有一点很重要,关于代码的内容大家要注意可能和我的命名不同,当然允许不同,但是要注意修改相关的位置,相信能做到这里的读者应该都是有一定基础的,但还是要提醒一下,需要注意。当然,读者还是要有Scala和Maven的基础。下面就开始今天的学习吧!


一、在 I D E A IDEA IDEA中创建 M a v e n Maven Maven项目

  项目主体用 S c a l a Scala Scala编写,采用 I D E A IDEA IDEA作为开发环境进行项目编写,采用 M a v e n Maven Maven作为项目构建和管理工具
  首先打开 I D E A IDEA IDEA,创建一个 M a v e n Maven Maven项目,命名为MovieRecommendSystem。为了方便后期的联调,会把业务系统的代码也添加进来,所以可以以MovieRecommendSystem作为父项目,并在其下建一个名为recommender的子项目,然后再在下面搭建多个子项目用于提供不同的推荐服务。

1.1 项目框架搭建

  在MovieRecommendSystem的pom.xml文件中加入元素pom,然后新建一个maven module。子项目的第一步是初始化业务数据,所以子项目命名为DataLoader
  父项目只是为了规范化项目结构,方便依赖管理,本身是不需要代码实现的,所以MovieRecommendSystem和recommender下的src文件夹都可以删掉
  目前的整体项目框架如下:
在这里插入图片描述

1.2 声明项目中工具的版本信息

  整个项目需要用到多个工具,它们的不同版本可能会对程序运行造成影响,所以应该在最外层的MovieRecommendSystem中声明所有子项目共用的版本信息。在MovieRecommendSystem/pom.xml中加入以下配置:

<properties><log4j.version>1.2.17log4j.version><slf4j.version>1.7.22slf4j.version><mongodb-spark.version>2.0.0mongodb-spark.version><casbah.version>3.1.1casbah.version><elasticsearch-spark.version>5.6.2elasticsearch-spark.version><elasticsearch.version>5.6.2elasticsearch.version><redis.version>2.9.0redis.version><kafka.version>0.10.2.1kafka.version><spark.version>2.1.1spark.version><scala.version>2.11.8scala.version><jblas.version>1.2.1jblas.version>
properties>

1.3 添加项目依赖

  首先,对于整个项目而言,应该有同样的日志管理,在MovieRecommendSystem中引入公有依赖:

<dependencies><!—引入共同的日志管理工具 --><dependency><groupId>org.slf4jgroupId><artifactId>jcl-over-slf4jartifactId><version>${slf4j.version}version>dependency><dependency><groupId>org.slf4jgroupId><artifactId>slf4j-apiartifactId><version>${slf4j.version}version>dependency><dependency><groupId>org.slf4jgroupId><artifactId>slf4j-log4j12artifactId><version>${slf4j.version}version>dependency><dependency><groupId>log4jgroupId><artifactId>log4jartifactId><version>${log4j.version}version>dependency>
dependencies>

  同样,对于maven项目的构建,可以引入公有的插件:

<build><plugins><plugin><groupId>org.apache.maven.pluginsgroupId><artifactId>maven-compiler-pluginartifactId><version>3.6.1version><configuration><source>1.8source><target>1.8target>configuration>plugin>plugins><pluginManagement><plugins><plugin><groupId>org.apache.maven.pluginsgroupId><artifactId>maven-assembly-pluginartifactId><version>3.0.0version><executions><execution><id>make-assemblyid><phase>packagephase><goals><goal>singlegoal>goals>execution>executions>plugin><plugin><groupId>net.alchim31.mavengroupId><artifactId>scala-maven-pluginartifactId><version>3.2.2version><executions><execution><goals><goal>compilegoal><goal>testCompilegoal>goals>execution>executions>plugin>plugins>pluginManageme>
build>

  然后,在MovieRecommendSystem/recommender/ pom.xml模块中,可以为所有的推荐模块声明spark相关依赖(这里的dependencyManagement表示仅声明相关信息,子项目如果依赖需要自行导入)

<dependencyManagement><dependencies><dependency><groupId>org.apache.sparkgroupId><artifactId>spark-core_2.11artifactId><version>${spark.version}version>dependency><dependency><groupId>org.apache.sparkgroupId><artifactId>spark-sql_2.11artifactId><version>${spark.version}version>dependency><dependency><groupId>org.apache.sparkgroupId><artifactId>spark-streaming_2.11artifactId><version>${spark.version}version>dependency><dependency><groupId>org.apache.sparkgroupId><artifactId>spark-mllib_2.11artifactId><version>${spark.version}version>dependency><dependency><groupId>org.apache.sparkgroupId><artifactId>spark-graphx_2.11artifactId><version>${spark.version}version>dependency><dependency><groupId>org.scala-langgroupId><artifactId>scala-libraryartifactId><version>${scala.version}version>dependency>dependencies>
dependencyManagement>

  由于各推荐模块都是scala代码,还应该引入scala-maven-plugin插件,用于scala程序的编译。因为插件已经在父项目中声明,所以这里不需要再声明版本和具体配置:

<build><plugins><plugin><groupId>net.alchim31.mavengroupId><artifactId>scala-maven-pluginartifactId>plugin>plugins>
build>

  对于具体的DataLoader子项目,需要spark相关组件,还需要mongodb的相关依赖,在MovieRecommendSystem/recommender/DataLoader/pom.xml文件中引入所有依赖(在父项目中已声明的不需要再加详细信息):

<dependencies><dependency><groupId>org.apache.sparkgroupId><artifactId>spark-core_2.11artifactId>dependency><dependency><groupId>org.apache.sparkgroupId><artifactId>spark-sql_2.11artifactId>dependency><dependency><groupId>org.scala-langgroupId><artifactId>scala-libraryartifactId>dependency><dependency><groupId>org.mongodbgroupId><artifactId>casbah-core_2.11artifactId><version>${casbah.version}version>dependency><dependency><groupId>org.mongodb.sparkgroupId><artifactId>mongo-spark-connector_2.11artifactId><version>${mongodb-spark.version}version>dependency><dependency><groupId>org.elasticsearch.clientgroupId><artifactId>transportartifactId><version>${elasticsearch.version}version>dependency><dependency><groupId>org.elasticsearchgroupId><artifactId>elasticsearch-spark-20_2.11artifactId><version>${elasticsearch-spark.version}version><exclusions><exclusion><groupId>org.apache.hivegroupId><artifactId>hive-serviceartifactId>exclusion>exclusions>dependency>
dependencies>

  至此,做数据加载需要的依赖都已配置好,可以开始写代码了

二、数据加载准备

  在src/main/目录下,可以看到已有的默认源文件目录是java,可以将其改名为scala。将数据文件movies.csv、ratings.csv、tags.csv复制到资源文件目录src/main/resources下,将从这里读取数据并加载到Mongodb和Elasticsearch中

2.1 M o v i e s Movies Movies数据集

  数据格式如下:

mid,name,descri,timelong,issue,shoot,language,genres,actors,directors

  例子如下:

1^Toy Story (1995)^ ^81 minutes^March 20, 2001^1995^English
^Adventure|Animation|Children|Comedy|Fantasy ^Tom Hanks|Tim Allen|Don Rickles|Jim Varney|Wallace Shawn|John Ratzenberger|Annie Potts|John Morris|Erik von Detten|Laurie Metcalf|R. Lee Ermey|Sarah Freeman|Penn Jillette|Tom Hanks|Tim Allen|Don Rickles|Jim Varney|Wallace Shawn ^John Lasseter

  Movie数据集有10个字段,每个字段之间通过“^”符号进行分割

字段名字段类型字段描述字段备注
midInt电影的ID
nameString电影的名称
descriString电影的描述
timelongString电影的时长
shootString电影拍摄时间
issueString电影发布时间
languageArray[String]电影语言每一项用“|”分割
genresArray[String]电影所属类别每一项用“|”分割
directorArray[String]电影的导演每一项用“|”分割
actorsArray[String]电影的演员每一项用“|”分割

2.2 R a t i n g s Ratings Ratings数据集

  数据格式如下:

userId,movieId,rating,timestamp

  例子如下:

1,31,2.5,1260759144

  Rating数据集有4个字段,每个字段之间通过“,”符号进行分割

字段名字段类型字段描述字段备注
uidInt用户的ID
midInt电影的ID
scoreDouble电影的分值
timestampLong评分的时间

2.3 T a g Tag Tag数据集

  数据格式如下:

userId,movieId,tag,timestamp

  例子如下:

1,31,action,1260759144

  Tag数据集有4个字段,每个字段之间通过“,”符号进行分割

字段名字段类型字段描述字段备注
uidInt用户的ID
midInt电影的ID
tagString电影的标签
timestampLong评分的时间

2.4 日志管理配置文件

  log4j对日志的管理,需要通过配置文件来生效。在src/main/resources下新建配置文件log4j.properties,写入以下内容:

log4j.rootLogger=info, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}  %5p --- [%50t]  %-80c(line:%5L)  :  %m%n

三、数据初始化到 M o n g o D B MongoDB MongoDB

3.1 启动 M o n g o D B MongoDB MongoDB数据库

[bigdata@linux mongodb]$ bin/mongod -config data/mongodb.conf

3.2 数据加载程序主体实现

  为原始数据定义几个样例类,通过SparkContext的textFile方法从文件中读取数据,并转换成DataFrame,再利用Spark SQL提供的write方法进行数据的分布式插入
  在DataLoader/src/main/scala下新建package,命名为com.IronmanJay.recommender,新建名为DataLoader的scala class文件,也就是:DataLoader/src/main/scala/com.IronmanJay.recommerder/DataLoader.scala
  程序主体代码如下:

/*** Movie 数据集** 260                                         电影ID,mid* Star Wars: Episode IV - A New Hope (1977)   电影名称,name* Princess Leia is captured and held hostage  详情描述,descri* 121 minutes                                 时长,timelong* September 21, 2004                          发行时间,issue* 1977                                        拍摄时间,shoot* English                                     语言,language* Action|Adventure|Sci-Fi                     类型,genres* Mark Hamill|Harrison Ford|Carrie Fisher     演员表,actors* George Lucas                                导演,directors**/
case class Movie(mid: Int, name: String, descri: String, timelong: String, issue: String,shoot: String, language: String, genres: String, actors: String, directors: String)/*** Rating数据集** 1,31,2.5,1260759144*/
case class Rating(uid: Int, mid: Int, score: Double, timestamp: Int)/*** Tag数据集** 15,1955,dentist,1193435061*/
case class Tag(uid: Int, mid: Int, tag: String, timestamp: Int)// 把mongo和es的配置封装成样例类/**** @param uri MongoDB连接* @param db  MongoDB数据库*/
case class MongoConfig(uri: String, db: String)/**** @param httpHosts      http主机列表,逗号分隔* @param transportHosts transport主机列表* @param index          需要操作的索引* @param clustername    集群名称,默认elasticsearch*/
case class ESConfig(httpHosts: String, transportHosts: String, index: String, clustername: String)object DataLoader {// 定义常量val MOVIE_DATA_PATH = "D:\\Software\\MovieRecommendSystem\\recommender\\DataLoader\\src\\main\\resources\\movies.csv"val RATING_DATA_PATH = "D:\\Software\\MovieRecommendSystem\\recommender\\DataLoader\\src\\main\\resources\\ratings.csv"val TAG_DATA_PATH = "D:\\Software\\MovieRecommendSystem\\recommender\\DataLoader\\src\\main\\resources\\tags.csv"val MONGODB_MOVIE_COLLECTION = "Movie"val MONGODB_RATING_COLLECTION = "Rating"val MONGODB_TAG_COLLECTION = "Tag"val ES_MOVIE_INDEX = "Movie"def main(args: Array[String]): Unit = {val config = Map("spark.cores" -> "local[*]","mongo.uri" -> "mongodb://linux:27017/recommender","mongo.db" -> "recommender","es.httpHosts" -> "linux:9200","es.transportHosts" -> "linux:9300","es.index" -> "recommender","es.cluster.name" -> "es-cluster")// 创建一个sparkConfval sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("DataLoader")// 创建一个SparkSessionval spark = SparkSession.builder().config(sparkConf).getOrCreate()import spark.implicits._// 加载数据val movieRDD = spark.sparkContext.textFile(MOVIE_DATA_PATH)val movieDF = movieRDD.map(item => {val attr = item.split("\\^")Movie(attr(0).toInt, attr(1).trim, attr(2).trim, attr(3).trim, attr(4).trim, attr(5).trim, attr(6).trim, attr(7).trim, attr(8).trim, attr(9).trim)}).toDF()val ratingRDD = spark.sparkContext.textFile(RATING_DATA_PATH)val ratingDF = ratingRDD.map(item => {val attr = item.split(",")Rating(attr(0).toInt, attr(1).toInt, attr(2).toDouble, attr(3).toInt)}).toDF()val tagRDD = spark.sparkContext.textFile(TAG_DATA_PATH)//将tagRDD装换为DataFrameval tagDF = tagRDD.map(item => {val attr = item.split(",")Tag(attr(0).toInt, attr(1).toInt, attr(2).trim, attr(3).toInt)}).toDF()implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))// 将数据保存到MongoDBstoreDataInMongoDB(movieDF, ratingDF, tagDF)// 数据预处理,把movie对应的tag信息添加进去,加一列 tag1|tag2|tag3...import org.apache.spark.sql.functions._/*** mid, tags** tags: tag1|tag2|tag3...*/val newTag = tagDF.groupBy($"mid").agg(concat_ws("|", collect_set($"tag")).as("tags")).select("mid", "tags")// newTag和movie做join,数据合并在一起,左外连接val movieWithTagsDF = movieDF.join(newTag, Seq("mid"), "left")implicit val esConfig = ESConfig(config("es.httpHosts"), config("es.transportHosts"), config("es.index"), config("es.cluster.name"))// 保存数据到ESstoreDataInES(movieWithTagsDF)spark.stop()}
}

3.3 将数据写入 M o n g o D B MongoDB MongoDB

  接下来,实现storeDataInMongo方法,将数据写入Mongodb中:

def storeDataInMongoDB(movieDF: DataFrame, ratingDF: DataFrame, tagDF: 
DataFrame)(implicit mongoConfig: MongoConfig): Unit = {// 新建一个mongodb的连接val mongoClient = MongoClient(MongoClientURI(mongoConfig.uri))// 如果mongodb中已经有相应的数据库,先删除mongoClient(mongoConfig.db)(MONGODB_MOVIE_COLLECTION).dropCollection()mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION).dropCollection()mongoClient(mongoConfig.db)(MONGODB_TAG_COLLECTION).dropCollection()// 将DF数据写入对应的mongodb表中movieDF.write.option("uri", mongoConfig.uri).option("collection", MONGODB_MOVIE_COLLECTION).mode("overwrite").format("com.mongodb.spark.sql").save()ratingDF.write.option("uri", mongoConfig.uri).option("collection", MONGODB_RATING_COLLECTION).mode("overwrite").format("com.mongodb.spark.sql").save()tagDF.write.option("uri", mongoConfig.uri).option("collection", MONGODB_TAG_COLLECTION).mode("overwrite").format("com.mongodb.spark.sql").save()//对数据表建索引mongoClient(mongoConfig.db)(MONGODB_MOVIE_COLLECTION).createIndex(MongoDBObject("mid" -> 1))mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION).createIndex(MongoDBObject("uid" -> 1))mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION).createIndex(MongoDBObject("mid" -> 1))mongoClient(mongoConfig.db)(MONGODB_TAG_COLLECTION).createIndex(MongoDBObject("uid" -> 1))mongoClient(mongoConfig.db)(MONGODB_TAG_COLLECTION).createIndex(MongoDBObject("mid" -> 1))mongoClient.close()}

四、数据初始化到 E l a s t i c S e a r c h ElasticSearch ElasticSearch

4.1 启动 E l a s t i c S e a r c h ElasticSearch ElasticSearch数据库

  这里有一个小坑,如果有的读者习惯使用root用户,那么启动ElasticSearch的时候要切换为非root用户,并且这个非root用户要被授权,关闭的时候也如此,否则会出现打不开或者其他问题的现象。启动ElasticSearch的命令如下:

[bigdata@linux elasticsearch-5.6.2]$ ./bin/elasticsearch -d

4.2 将数据写入 E l a s t i c S e a r c h ElasticSearch ElasticSearch

  与上节类似,同样主要通过Spark SQL提供的write方法进行数据的分布式插入,实现storeDataInES方法:

def storeDataInES(movieDF: DataFrame)(implicit eSConfig: ESConfig): Unit= {// 新建es配置val settings: Settings = Settings.builder().put("cluster.name", eSConfig.clustername).build()// 新建一个es客户端val esClient = new PreBuiltTransportClient(settings)val REGEX_HOST_PORT = "(.+):(\\d+)".reSConfig.transportHosts.split(",").foreach {case REGEX_HOST_PORT(host: String, port: String) => {esClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port.toInt))}}// 先清理遗留的数据if (esClient.admin().indices().exists(new IndicesExistsRequest(eSConfig.index)).actionGet().isExists) {esClient.admin().indices().delete(new DeleteIndexRequest(eSConfig.index))}esClient.admin().indices().create(new CreateIndexRequest(eSConfig.index))movieDF.write.option("es.nodes", eSConfig.httpHosts).option("es.http.timeout", "100m").option("es.mapping.id", "mid")
.option("es.nodes.wan.only", "true").mode("overwrite").format("org.elasticsearch.spark.sql").save(eSConfig.index + "/" + ES_MOVIE_INDEX)
}

总结

  最近有点忙,考研复试结束之后再到拟录取之后,一直在摆(bushi),所以把这个系列给搁置了,刚刚抽出空给这次补上了,后面我争取一周一篇(立一个小小的flag)。也请大家一起努力,加油,我在下篇博客等你!哦,对了,这篇博客因为有代码,所以有些小细节大家一定要注意,比如命名之类的,一定注意!