Sparksql练习题

练习题

-------------------------------以下使用Structured Streaming:-------------------------------
1、请使用Structured Streaming读取Socket数据,统计出每个单词的个数
2、请使用Structured Streaming读取student_info文件夹写的csv文件,
2.1、统计出文件中的男女生各有多少人
2.2、统计出姓“王”女生的人数
2.3、统计出姓“王”男生和女生的比例
3、请使用Structured Streaming读取department_info文件夹写的csv文件
3.1统计出各个院系的分别多少条信息
4、请使用Structured Streaming读取student_score文件夹写的csv文件
4.1、统计出每个班级的最高分数
4.2、统计出男生最高分
4.3、统计出女生最高分
4.4、分别统计出男生和女生的分数前三名
4.5、统计出分数在500分以上的人数
4.7、统计出分数在300分以下的人中男女各占多少

import org.apache.spark.sql.{DataFrame, SparkSession}/*** 使用Structured Streaming读取Socket数据,把单词和单词的反转组成 json 格式写入到当前目录中的file文件夹中* (abc,cba)*/
object demo01 {
def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().master("local[*]").appName("demo01").getOrCreate()spark.sparkContext.setLogLevel("WARN")import spark.implicits._//获取每行的数据val lines: DataFrame = spark.readStream.format("socket") // 设置数据源.option("host", "cdh01").option("port", 10000).load//val words: DataFrame = lines.as[String].flatMap(line => {line.split("\\W+").map(word => {// \\W+  空格(word, word.reverse)})}).toDF("原单词", "反转单词")words.writeStream.outputMode("append").format("json") //  // 支持 "orc", "json", "csv".option("path", "./file") // 输出目录.option("checkpointLocation", "./ck1")  // 必须指定 checkpoint 目录.start.awaitTermination()}
}
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.{LongType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
/**
2、请使用Structured Streaming读取student_info文件夹写的csv文件,
2.1、统计出文件中的男女生各有多少人
2.2、统计出姓“王”女生的人数
2.3、统计出姓“王”男生和女生的比例*/
object demo02 {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().master("local[*]").appName("ReadFromCSV").getOrCreate()spark.sparkContext.setLogLevel("WARN")val student_info_schema: StructType = new StructType().add("学号","string").add("姓名","string").add("性别","string").add("班级编号","integer").add("入学日期","String")//2.接收数据import spark.implicits._val student_info: DataFrame = spark.readStream.format("csv").option("header","true")//说明csv文件有标题.schema(student_info_schema).load("C:\\Users\\hasee\\Desktop\\4.16\\student_info")//2.1、统计出文件中的男女生各有多少人val result1: Dataset[Row] =student_info.groupBy("性别").count()//2.2、统计出姓“王”男生和女生的各有多少人val result2: Dataset[Row] =student_info.filter($"姓名" .startsWith("王") ).groupBy("性别").count()//输出结果result2.writeStream.format("console").outputMode("complete").trigger(Trigger.ProcessingTime(0))// 触发器 数字表示毫秒值. 0 表示立即处理.start().awaitTermination()}
}
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.types.StructType
/**
3、请使用Structured Streaming读取department_info文件夹写的csv文件
3.1统计出各个院系的分别多少条信息
*/
object demo03 {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().master("local[*]").appName("ReadFromCSV").getOrCreate()spark.sparkContext.setLogLevel("WARN")//读取数据//设置数据的结构val department_info_schema: StructType = new StructType().add("院系编号", "string").add("院系名称", "string")val department_info: DataFrame = spark.readStream.format("csv").option("header","true").schema(department_info_schema).load("C:\\Users\\hasee\\Desktop\\4.16\\department_info\\")import spark.implicits._//3.1统计出各个院系的分别多少条信息val result: Dataset[Row] = department_info.groupBy("院系名称").count()result.writeStream.format("console").outputMode("complete").trigger(Trigger.ProcessingTime(0))// 触发器 数字表示毫秒值. 0 表示立即处理.start().awaitTermination()}
}
import org.apache.spark.sql._
import org.apache.spark.sql.types.StructType
import org.apache.spark.{SparkConf, SparkContext}
/**
请使用Structured Streaming读取student_score文件夹写的csv文件
**/
object demo04 {//准备临时存储数据的样例类def main(args: Array[String]): Unit = {//1  创建sparksessionval spark: SparkSession = SparkSession.builder().appName("demo04").master("local[*]").getOrCreate()spark.sparkContext.setLogLevel("WARN")val student_scores_schema: StructType = new StructType().add("sno", "string").add("sname", "string").add("sex", "string").add("cno", "string").add("scores", "string")val frame: DataFrame = spark.read.option("header", true).format("csv").schema(student_scores_schema).load("C:\\Users\\hasee\\Desktop\\4.16\\student_score\\")frame.createOrReplaceTempView("student_scores")//4.1、统计出每个班级的最高分数
//    spark.sql("select cno ,max(scores) from student_scores group by cno ").show()//4.2、统计出男生最高分
//  spark.sql("select max(scores) from student_scores where sex='男' ").show()//4.3、统计出女生最高分
//      spark.sql("select max(scores) from student_scores where sex='女' ").show()//4.4、分别统计出男生和女生的分数前三名//SELECT *, Row_Number() OVER (partition by deptid ORDER BY salary desc) rank FROM employee
//    spark.sql("select * from (SELECT *, Row_Number() OVER (partition by sex ORDER BY scores desc) rank FROM student_scores) t1 where t1.rank<=3").show()//4.5、统计出分数在500分以上的人数
//     spark.sql("select count(scores)  from student_scores  where scores>500").show()//4.6、统计出分数在300分以下的人中男女各占多少
//    spark.sql("select sex, count(*)  from student_scores  where scores<300  group by sex ").showspark.stop()}
}

-------------------------------以下使用sparksql:-------------------------------------------
5.请使用Structured Streaming读取class_info文件夹写的csv文件
5.1、统计出哪个院系的专业最多
5.2、统计出计算机学院中有多少专业
5.3、统计出经济管理学院的会计和工商管理的班级数
5.4、分别统计出每个学院的班级最多的专业
5.5、统计出班级编号以2开头的所有的专业名称

import org.apache.spark.SparkContext
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, SparkSession}object demo05 {def main(args: Array[String]): Unit = {//1  创建sparksessionval spark: SparkSession = SparkSession.builder().appName("demo05").master("local[*]").getOrCreate()spark.sparkContext.setLogLevel("WARN")val class_info_schema: StructType = new StructType().add("classid", "string").add("classname", "string").add("date", "string").add("yname", "string")val frame: DataFrame = spark.read.option("header", true).format("csv").schema(class_info_schema).load("C:\\Users\\hasee\\Desktop\\4.20\\4.16号练习题50道2.0\\class_info\\")frame.createOrReplaceTempView("class_info")spark.udf.register("getMajorCname", (str: String) => {str.substring(0, str.length - 2)})//    spark.sql("select getMajorCname(classname) from class_info ").show(100)//5.1、统计出哪个院系的专业最多
//    spark.sql("select yname,count(distinct(t1.cname)) as count from  (select classid,getMajorCname(classname) as cname,date,yname  from class_info) t1 group by t1.yname").show(100)
//    spark.sql("select * from (select yname,count(distinct(t1.cname)) as count from  (select classid,getMajorCname(classname) as cname,date,yname  from class_info) t1  group by t1.yname) t2 order by count desc limit 1").show()//    spark.sql(" select * from (select yname,count(distinct(t1.cname)) as count from  (select classid,getMajorCname(classname) as cname,date,yname  from class_info) t1 group by t1.yname) t2 order by count desc limit 1 ").show()//5.2、统计出计算机学院中有多少专业
//          spark.sql("select * from (select yname,count(distinct(t1.cname)) as count from  (select classid,getMajorCname(classname) as cname,date,yname  from class_info) t1 group by t1.yname ) t2 where yname='计算机学院' ").show()//5.3、统计出经济管理学院的会计和工商管理的班级数//          spark.sql("select  '会计',count(classname)  from class_info where  yname='经济管理学院' and classname like '会计%' ").show()
//          spark.sql("select  '工商管理',count(classname)  from class_info where  yname='经济管理学院' and classname like '工商管理%' ").show()//5.4、分别统计出每个学院的班级最多的专业//          spark.sql("select yname,count(classid) as count ,cname from (select classid,getMajorCname(classname) as cname,date,yname  from class_info) t1 group by yname,t1.cname ").show()spark.sql("select * from (select t2.*,rank() over( partition by t2.yname order by t2.count desc ) as rank  from (select yname,count(classid) as count ,cname from (select classid,getMajorCname(classname) as cname,date,yname  from class_info) t1 group by yname,t1.cname ) t2) t3 where t3.rank <= 1").show(100)//5.5、统计出班级编号以2开头的所有的专业名称
//    spark.sql("select   getMajorCname(classname)  as zhuanye from class_info where   classid like '02%' group by zhuanye").show()spark.stop()}
}

表(一)Student (学生表)
属性名 数据类型 可否为空 含 义
Sno varchar (20) 否 学号
Sname varchar (20) 否 学生姓名
Ssex varchar (20) 否 学生性别
Sbirthday datetime 可 学生出生年月
Class varchar (20) 可 学生所在班级

表(二)Course(课程表)
属性名 数据类型 可否为空 含 义
Cno varchar (20) 否 课程号
Cname varchar (20) 否 课程名称
Tno varchar (20) 否 教工编号

表(三)Score(成绩表)
属性名 数据类型 可否为空 含 义
Sno varchar (20) 否 学号
Cno varchar (20) 否 课程号
Degree Decimal(4,1) 可 成绩

表(四)Teacher(教师表)
属性名 数据类型 可否为空 含 义
Tno varchar (20) 否 教工编号
Tname varchar (20) 否 教工姓名
Tsex varchar (20) 否 教工性别
Tbirthday datetime 可 教工出生年月
Prof varchar (20) 可 职称
Depart varchar (20) 否 教工所在部门

表1-2数据库中的数据

表(一)Student

Sno	Sname	Ssex	Sbirthday	class
108	丘东	男	1977-09-01	95033
105	匡明	男	1975-10-02	95031
107	王丽	女	1976-01-23	95033
101	李军	男	1976-02-20	95033
109	王芳	女	1975-02-10	95031
103	陆君	男	1974-06-03	95031

表(二)Course

Cno	Cname	Tno
3-105	计算机导论	825
3-245	操作系统	804
6-166	数字电路	856
9-888	高等数学	831

表(三)Score

Sno	Cno	Degree
103	3-245	86
105	3-245	75
109	3-245	68
103	3-105	92
105	3-105	88
109	3-105	76
101	3-105	64
107	3-105	91
108	3-105	78
101	6-166	85
107	6-166	79
108	6-166	81

表(四)Teacher

Tno	Tname	Tsex	Tbirthday	Prof	Depart
804	李诚	男	1958-12-02	副教授	计算机系
856	张旭	男	1969-03-12	讲师	电子工程系
825	王萍	女	1972-05-05	助教	计算机系
831	刘冰	女	1977-08-14	助教	电子工程系
sparksql的练习题import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, SparkSession}
object Test02 {// 定义样例类封装数据case class Student(Sno:String,Sname:String,Ssex:String,Sbirthday:String,Class:String)case class Course(Cno:String,Cname:String,Tno:String)case class Score(Sno:String,Cno:String,Degree:String)case class Teacher(Tno:String,Tname:String,Tsex:String,Tbirthday:String,Prof:String,Depart:String)def main(args: Array[String]): Unit = {// 1. 创建SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("test02").getOrCreate()val sc: SparkContext = spark.sparkContext// 设置日志级别sc.setLogLevel("WARN")// 读取test文件val rdd1: RDD[String] = sc.textFile("F:\\网课\\Spark\\spark练习\\4.16号练习题50道2.0\\student.txt")val rdd2: RDD[String] = sc.textFile("F:\\网课\\Spark\\spark练习\\4.16号练习题50道2.0\\course.txt")val rdd3: RDD[String] = sc.textFile("F:\\网课\\Spark\\spark练习\\4.16号练习题50道2.0\\score.txt")val rdd4: RDD[String] = sc.textFile("F:\\网课\\Spark\\spark练习\\4.16号练习题50道2.0\\teacher.txt")val student: RDD[Student] = rdd1.map(x=>{val str = x.split("\t");Student(str(0),str(1),str(2),str(3),str(4))})val course: RDD[Course] = rdd2.map(x=>{val str = x.split("\t");Course(str(0),str(1),str(2))})val score: RDD[Score] = rdd3.map(x=>{val str = x.split("\t");Score(str(0),str(1),str(2))})val teacher: RDD[Teacher] = rdd4.map(x=>{val str = x.split("\t");Teacher(str(0),str(1),str(2),str(3),str(4),str(5))})// 导入隐式转换import spark.implicits._// 将RDD转换成DFval stuframe = student.toDF()val couframe = course.toDF()val scoframe = score.toDF()val teaframe = teacher.toDF()// 创建临时表stuframe.createOrReplaceTempView("student")couframe.createOrReplaceTempView("course")scoframe.createOrReplaceTempView("score")teaframe.createOrReplaceTempView("teacher")/* 6. 查询Student表中“95031”班或性别为“女”的同学记录*/spark.sql("Select * from student where sno = '95031' or ssex = '女'").show()/* 7. 以Class降序,升序查询Student表的所有记录 */spark.sql("Select * from student order by class desc").show()/* 8. 以Cno升序、Degree降序查询Score表的所有记录。 */spark.sql("Select * from score order by cno,degree desc").show()/* 9. 查询“95031”班的学生。 */spark.sql("Select sname from student where class = '95031'").show()/* 10. 查询Score表中的最高分的学生学号和课程号。(子查询或者排序) */spark.sql("Select sno,cno from score order by degree desc limit 1").show()/* 11. 查询每门课的平均成绩。  */spark.sql("Select first(course.Cname),avg(degree) from score,course where score.Cno = course.Cno  group by score.Cno").show()/* 12. 查询Score表中至少有5名学生选修的并以3开头的课程的平均分数。  */spark.sql("Select first(cno),avg(Degree) from score where cno like '3%' group by cno having count(cno) >= 5").show()/* 13.查询分数大于70,小于90的Sno列  */spark.sql("Select sno from score where Degree > 70 and Degree < 90 ").show()/* 14.查询所有学生的Sname、Cno和Degree列。 */spark.sql("select sname,cno,degree from score,student where score.sno = student.sno").show()/* 15.查询所有学生的Sno、Cname和Degree列。 */spark.sql("select sno,cname,degree from score,course where score.cno=course.cno").show()/* 16.查询所有学生的Sname、Cname和Degree列。 */spark.sql("""|select sname,cname,degree from score,course,student|where score.cno = course.cno and student.sno = score.sno""".stripMargin).show()/* 17.查询“95033”班学生的平均分。*/spark.sql("select first(class),avg(Degree) from student,score where student.sno = score.sno and class = '95033'").show()/* 18.查询所有选修“计算机导论”课程的“女”同学的成绩表。 */spark.sql("""|select sname,cname,ssex,Degree from student,course,score|where student.sno = score.sno and course.cno=score.cno|and ssex = "女" and cname = "计算机导论"""".stripMargin).show()/* 19.查询选修“3-105”课程的成绩高于“109”号同学成绩的所有同学的记录。*/spark.sql("""|select * from score where cno = "3-105" and degree > (select degree from score where sno = "109" and cno = "3-105" )|""".stripMargin).show()/* 20.查询score中选学多门课程的同学中分数为非最高分成绩的记录。  */spark.sql("select * from score a where sno in (select sno from score group by sno having count(*)>1) and degree<(select max(degree) from score b where a.cno=b.cno )").show()/* 21.查询成绩高于学号为“109”、课程号为“3-105”的成绩的所有记录。 */spark.sql("select * from score where degree > (select degree from score where sno = '109' and cno = '3-105')").show()/* 22. 查询和学号为105的同学同年出生的所有学生的Sno、Sname和Sbirthday列。*/spark.udf.register("sub",(str:String,num1:Int,num2:Int)=>str.substring(num1,num2))spark.sql("select sno,sname,sbirthday from student where sub(Sbirthday,0,5) = (select sub(Sbirthday,0,5) from student where sno = '105') ").show()/* 23. 查询“张旭“教师任课的学生成绩 */spark.sql("""|select  sno,degree from score,course,teacher|where tname = "张旭" and teacher.tno = course.tno|and course.Cno = score.Cno""".stripMargin).show()/* 24. 查询选修某课程的同学人数多于4人的教师姓名。 */spark.sql("""|select first(tname) from teacher,score,course|where teacher.tno = course.tno|and course.Cno = score.Cno|group by score.Cno having count(score.Cno) > 4""".stripMargin).show()/* 25. 查询95033班和95031班全体学生的记录。 */spark.sql("select * from student where class= '95033' or class = '95031'").show()/* 26. 查询存在有85分以上成绩的课程Cno.  */spark.sql("select distinct cno from score where degree >= 85").show()/* 27. 查询出“计算机系“教师所教课程的成绩表。 */spark.sql("""|select  score.*  from score,Course,teacher|where  teacher.tno = course.tno|and course.Cno = score.Cno|and teacher.Depart = "计算机系"""".stripMargin).show()/* 28. 查询“计算机系”与“电子工程系“不同职称的教师的Tname和Prof */spark.sql("""|select tname,prof from teacher where depart = '计算机系' and prof not in|(select prof from teacher where depart = '电子工程系')""".stripMargin).show()/* 29. 查询选修编号为“3-105“课程且成绩至少高于选修编号为“3-245”的同学的Cno、Sno和Degree,并按Degree从高到低次序排序。 */spark.sql("""|select s1.cno,s1.sno,s1.degree from score s1,score s2|where s1.cno = "3-105" and s2.cno = "3-245"|and s1.Sno = s2.Sno|and s1.degree > s2. degree|order by s1.degree desc""".stripMargin).show()/* 30. 查询选修编号为“3-105”且成绩高于选修编号为“3-245”课程的同学的Cno、Sno和Degree. */spark.sql("""|select cno,sno,degree from score|where cno = "3-105" and degree > (select max(degree) from score where cno = "3-245")""".stripMargin).show()/* 31. 查询所有教师和同学的name、sex和birthday. */spark.sql("""|select distinct Sname as name,Ssex as sex,Sbirthday as birthday from student|union|select distinct Tname as name,Tsex as sex,Tbirthday as birthday from Teacher|""".stripMargin).show()/* 32. 查询所有“女”教师和“女”同学的name、sex和birthday. */spark.sql("""|select distinct Sname as name,Ssex as sex,Sbirthday as birthday from student where Ssex = "女"|union|select distinct Tname as name,Tsex as sex,Tbirthday as birthday from Teacher where Tsex = "女"|""".stripMargin).show()/* 33. 查询成绩比该课程平均成绩低的同学的成绩表。 */spark.sql("""|select * from score s1|where degree < (select avg(degree) from score s2 where s1.Cno = s2.Cno)""".stripMargin).show()/* 34. 查询所有任课教师的Tname和Depart. */spark.sql("""|select tname,depart from teacher|where tno in (select tno from course where cno in (select distinct cno from score) )""".stripMargin).show()/* 35. 查询所有未讲课的教师的Tname和Depart.  */spark.sql("""|select tname,depart from teacher|where tno not in (select tno from course where cno in (select distinct cno from score) )""".stripMargin).show()/* 36. 查询至少有2名男生的班号。 */spark.sql("""|select class from student|where ssex = "男"|group by class having count(class) >= 2""".stripMargin).show()/* 37. 查询Student表中不姓“王”的同学记录。 */spark.sql("""|select * from student|where sname not like "王%"""".stripMargin).show()/* 38. 查询Student表中每个学生的姓名和年龄。将函数运用到spark sql中去计算,可以直接拿String的类型计算不需要再转换成数值型 默认是会转换成Double类型计算浮点型转整型 */spark.sql("""|select Sname,year(current_date)-year(Sbirthday) from student""".stripMargin).show()/* 39. 查询Student表中最大和最小的Sbirthday日期值。 时间格式最大值,最小值*/spark.sql("""|select max(Sbirthday),min(Sbirthday) from student""".stripMargin).show()/* 40. 以班号和年龄从大到小的顺序查询Student表中的全部记录。 查询结果排序*/spark.sql("""|select * from student order by class desc,year(current_date)-year(Sbirthday) desc""".stripMargin).show()/* 41. 查询“男”教师及其所上的课程。 */spark.sql("""|select cname from course,teacher|where teacher.Tno = course.Tno and Tsex = "男"""".stripMargin).show()/* 42. 查询最高分同学的Sno、Cno和Degree列。  */spark.sql("""|select * from score|where degree = (select max(degree) from score)""".stripMargin).show()/* 43. 查询和“李军”同性别的所有同学的Sname.*/spark.sql("""|select sname from student|where ssex = (select ssex from student where sname = "李军") and sname != "李军"""".stripMargin).show()/* 44. 查询和“李军”同性别并同班的同学Sname. */spark.sql("""|select distinct s1.sname from student s1,student s2|where s1.ssex = s2.ssex and s1.class = s2.class|and s1.sname != "李军"""".stripMargin).show()/* 45. 查询所有选修“计算机导论”课程的“男”同学的成绩表。 */spark.sql("""|select degree from score,student,course|where score.cno = course.cno|and score.sno = student.sno|and student.ssex = "男"|and course.cname = "计算机导论"""".stripMargin).show()/* 46. 查询Student表中的所有记录的Sname、Ssex和Class列。 */spark.sql("""|select sname,ssex,class from student""".stripMargin).show()/* 47. 查询教师所有的单位即不重复的Depart列。*/spark.sql("""|select distinct depart from teacher""".stripMargin).show()/* 48. 查询Student表的所有记录 */spark.sql("""|select * from student""".stripMargin).show()/* 49. 查询Score表中成绩在60到80之间的所有记录。 */spark.sql("""|select * from score where degree > 60 and degree < 80""".stripMargin).show()/* 50. 查询Score表中成绩为85,86或88的记录。 */spark.sql("""|select * from score where Degree in (85,86,88)""".stripMargin).show()}
}