from pyspark.sql.functions import col
import altair as alt
import pandas as pd
from matplotlib import pyplot as plt
get_ipython().run_line_magic('matplotlib', 'inline')
csv = spark.read.option("header",True).csv("hdfs://localhost:9000/data2/porn_data_movie_tags.csv")
tag_csv = spark.read.option("header",True).csv("hdfs://localhost:9000/data2/porn_data_tag.csv")
csv.show()
+---+--------+------+ | id|movie_id|tag_id| +---+--------+------+ | 1| 9909| 1| | 2| 9909| 2| | 3| 9909| 3| | 4| 9909| 4| | 5| 9910| 5| | 6| 9910| 6| | 7| 9910| 7| | 8| 9910| 8| | 9| 9910| 9| | 10| 9910| 10| | 11| 9911| 12| | 12| 9911| 2| | 13| 9911| 1| | 14| 9911| 13| | 15| 9910| 11| | 16| 9911| 14| | 17| 9911| 15| | 18| 9911| 5| | 19| 9910| 16| | 20| 9910| 17| +---+--------+------+ only showing top 20 rows
csv.printSchema()
root |-- id: string (nullable = true) |-- movie_id: string (nullable = true) |-- tag_id: string (nullable = true)
from pyspark.sql.functions import col, desc, lit
csv = csv.withColumn("tag_id",col("tag_id").cast("Integer")).withColumn("count", lit(1))
csv.printSchema()
root |-- id: string (nullable = true) |-- movie_id: string (nullable = true) |-- tag_id: integer (nullable = true) |-- count: integer (nullable = false)
tag_rdd = csv.select('tag_id').join(tag_csv, csv.tag_id == tag_csv.id, "inner")
tag_rdd.show()
+------+---+--------------------+--------------------+--------+--------+ |tag_id| id| create| update| name|describe| +------+---+--------------------+--------------------+--------+--------+ | 1| 1|7/5/2020 09:36:51...|26/8/2020 00:52:5...| 本土| null| | 2| 2|7/5/2020 09:36:51...|26/8/2020 00:52:5...| 正妹| null| | 3| 3|7/5/2020 09:36:51...|26/8/2020 00:52:1...|第一人稱| null| | 4| 4|7/5/2020 09:36:52...|27/8/2020 03:30:2...| 口交| null| | 5| 5|7/5/2020 09:37:38...|25/8/2020 04:36:3...| 制服| null| | 6| 6|7/5/2020 09:37:39...|26/8/2020 00:52:5...|獨家推薦| null| | 7| 7|7/5/2020 09:37:39...|26/8/2020 00:52:3...| 痴女| null| | 8| 8|7/5/2020 09:37:39...|6/8/2020 11:08:56...| 苗條| null| | 9| 9|7/5/2020 09:37:40...|6/8/2020 11:08:56...| 業餘| null| | 10| 10|7/5/2020 09:37:40...|25/5/2020 01:39:4...| 辣妹| null| | 12| 12|7/5/2020 09:37:41...|26/8/2020 00:52:0...| 自慰| null| | 2| 2|7/5/2020 09:36:51...|26/8/2020 00:52:5...| 正妹| null| | 1| 1|7/5/2020 09:36:51...|26/8/2020 00:52:5...| 本土| null| | 13| 13|7/5/2020 09:37:42...|10/8/2020 04:29:2...| 水手服| null| | 11| 11|7/5/2020 09:37:40...|28/7/2020 23:48:3...|角色扮演| null| | 14| 14|7/5/2020 09:37:42...|17/8/2020 01:12:0...| 學生| null| | 15| 15|7/5/2020 09:37:43...|25/8/2020 04:36:5...| COSPLAY| null| | 5| 5|7/5/2020 09:37:38...|25/8/2020 04:36:3...| 制服| null| | 16| 16|7/5/2020 09:37:42...|8/5/2020 04:00:48...| 女學生| null| | 17| 17|7/5/2020 09:37:44...|6/8/2020 11:08:56...|體內射精| null| +------+---+--------------------+--------------------+--------+--------+ only showing top 20 rows
tag_rdd.first()
tr = tag_rdd.select('name')
tag_count_rdd = tr.rdd.map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y)
tag_count_rdd.take(10)
[(Row(name='本土'), 787), (Row(name='業餘'), 760), (Row(name='辣妹'), 76), (Row(name='角色扮演'), 173), (Row(name='COSPLAY'), 365), (Row(name='配信専用'), 669), (Row(name='無毛'), 46), (Row(name='230ORETD'), 7), (Row(name='眼鏡'), 22), (Row(name='流出'), 340)]
tp = tag_count_rdd.sortBy(lambda a: a[1],ascending=False).toDF().toPandas()
tp.head()
| _1 | _2 | |
|---|---|---|
| 0 | (专业拍摄,) | 31857 | 
| 1 | (无字幕,) | 28760 | 
| 2 | (日本,) | 27314 | 
| 3 | (无码,) | 22786 | 
| 4 | (口交,) | 21224 | 
# 视频标签数量展示
alt.Chart(tp[:40]).mark_bar().encode(
    x=alt.X('_1', title='标签名称', sort='-y'),
    y=alt.Y('_2', title='视频数量')
)
tag_count_rdd.saveAsTextFile('hdfs://localhost:9000/mapreduce/movie_tag_data')
# model信息
movie_model_csv = spark.read.option("header",True).csv("hdfs://localhost:9000/data2/porn_data_movie_pron_model.csv")
model_csv = spark.read.option("header",True).csv("hdfs://localhost:9000/data2/porn_data_pornmodel.csv")
movie_model_csv.show(10)
+---+--------+------------+ | id|movie_id|pornmodel_id| +---+--------+------------+ | 1| 47610| 1| | 2| 47611| 2| | 3| 47612| 2| | 4| 47613| 2| | 5| 47614| 2| | 6| 47615| 2| | 7| 47616| 2| | 8| 47617| 2| | 9| 47618| 2| | 10| 47619| 2| +---+--------+------------+ only showing top 10 rows
model_csv.show(10)
+---+--------------------+--------------------+-----------------+--------+--------------+--------------------------+--------+ | id| create| update| name|describe| name_en| name_jp|third_pk| +---+--------------------+--------------------+-----------------+--------+--------------+--------------------------+--------+ | 1|26/5/2020 05:47:1...|28/5/2020 18:06:4...| 菅野松雪| null| Kanno Sayuki| 菅野さゆき、かんの さゆき| CP_455| | 2|26/5/2020 05:47:1...|27/8/2020 03:30:2...| 素人| null| null| null| CP_1442| | 3|26/5/2020 05:47:4...|25/8/2020 04:36:5...| 网红| null| null| null| CP_1464| | 4|26/5/2020 05:49:4...|20/8/2020 03:15:3...| 模特| null| null| null| CP_1465| | 5|26/5/2020 05:53:3...|27/8/2020 03:30:2...| 水果派a龟| null| null| null| CP_5243| | 6|26/5/2020 05:54:0...|29/5/2020 06:38:5...| 主播| null| null| null| CP_1496| | 7|26/5/2020 05:54:1...|20/8/2020 03:14:4...| 动画人物| null| null| null| CP_1918| | 8|26/5/2020 05:54:3...|28/5/2020 14:29:5...| 古濑玲| null|Hinamori Ayumi| 古瀬リカ| CP_1943| | 9|26/5/2020 05:54:3...|24/8/2020 10:15:1...|上原亚衣/上原亜衣| null| Ai Uehara| 上原亜衣、うえはら あい| CP_847| | 10|26/5/2020 05:54:3...|29/5/2020 00:33:5...|相内史织/相内诗织| null| Aiuchi Shiori|相内しおり、あいうちしおり| CP_1065| +---+--------------------+--------------------+-----------------+--------+--------------+--------------------------+--------+ only showing top 10 rows
movie_model_rdd = movie_model_csv.select('movie_id','pornmodel_id').join(model_csv, movie_model_csv.pornmodel_id == model_csv.id, "inner")
movie_model_rdd.select('movie_id','pornmodel_id', 'name').show(10)
+--------+------------+--------+ |movie_id|pornmodel_id| name| +--------+------------+--------+ | 47610| 1|菅野松雪| | 47611| 2| 素人| | 47612| 2| 素人| | 47613| 2| 素人| | 47614| 2| 素人| | 47615| 2| 素人| | 47616| 2| 素人| | 47617| 2| 素人| | 47618| 2| 素人| | 47619| 2| 素人| +--------+------------+--------+ only showing top 10 rows
movie_mode_rdd = movie_model_rdd.select('name')
model_count_rdd = movie_mode_rdd.rdd.map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y)
model_count_rdd.take(10)
[(Row(name='菅野松雪'), 18), (Row(name='素人'), 25824), (Row(name='网红'), 156), (Row(name='模特'), 161), (Row(name='水果派a龟'), 48), (Row(name='主播'), 262), (Row(name='动画人物'), 917), (Row(name='古濑玲'), 14), (Row(name='上原亚衣/上原亜衣'), 63), (Row(name='相内史织/相内诗织'), 7)]
mtp = model_count_rdd.sortBy(lambda a: a[1],ascending=False).toDF().toPandas()
mtp.head()
| _1 | _2 | |
|---|---|---|
| 0 | (素人,) | 25824 | 
| 1 | (动画人物,) | 917 | 
| 2 | (波多野结衣,) | 363 | 
| 3 | (主播,) | 262 | 
| 4 | (仁科百华,) | 162 | 
# 女优作品数量展示
alt.Chart(mtp[1:40]).mark_bar().encode(
    x=alt.X('_1', title='女优姓名', sort='-y'),
    y=alt.Y('_2', title='视频数量')
)
github:https://github.com/obaby/Porn-Data-Anaylize
 
											
