当前使用的SparkSQL版本为1.5.2
MySQL读取数据
两种方式去读取
- format
- jdbc
方法一
1
2
3
4
5
6
7
8
9
10
11val conf = new SparkConf().setAppName("SQL")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val srcData = sqlContext.read.format("jdbc").options(Map(
"url"->"jdbc:mysql://IP:PORT/DBNAME",
"dbtable"->"TABLE_NAME",
"driver"->"com.mysql.jdbc.Driver",
"user"->"USER",
"password"->"PWD")).load()
val data = srcData.select("name","age").show()
注:将IP,PORT,DBNAME,TABLE_NAME,USER,PWD换成自己的
方法二
1
2
3
4
5
6
7
8
9
10
11 val conf = new SparkConf().setAppName("SQL")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val properties = new Properties()
properties.setProperty("user","USER")
properties.setProperty("password","PWD")
val dataFrame = sqlContext.read.jdbc("jdbc:mysql://IP:PORT/DBNAME",
"TABLE_NAME",properties)
dataFrame.select("name","age").show()注:将IP,PORT,DBNAME,TABLE_NAME,USER,PWD换成自己的
MySQL写入数据
本例采用RDD存入数据库的方式演示。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15val conf = new SparkConf().setAppName("SQL")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
case class User(name:String,age:Int)
val properties = new Properties()
properties.setProperty("user","USER")
properties.setProperty("password","PWD")
val users = sc.parallelize(1 to 10).map(f=>User("张"+f,20+f)).map(f=>Row(f.name,f.age))
val schema = StructType(Array(StructField("name",StringType,true),
StructField("age",IntegerType,true)))
sqlContext.createDataFrame(users,schema).write.mode(SaveMode.Append)
.jdbc("jdbc:mysql://IP:PORT/DBNAME","TABLE_NAME",properties)
注:将IP,PORT,DBNAME,TABLE_NAME,USER,PWD换成自己的
mode选择
其中mode表示采用的是什么方式存表
SaveMode.Append:会将新的数据加在原来的数据后面
SaveMode.Overwrite:会删除原表数据
SaveMode.ErrorIfExists:会抛出 Table user already exists 异常,默认为此状态
SaveMode.Ignore:如果当前表有数据,新数据会被丢弃
StructType
StructType接受集合类型的StructField参数
StructField有以下四个字段1
2
3
4
5
6
7
8
9case class StructField(
//字段名
name: String,
//类型
dataType: DataType,
//是否允许为空
nullable: Boolean = true,
//元数据
metadata: Metadata = Metadata.empty)
DataType包含以下几类1
2
3
4
5
6
7
8
9
10
11
12StringType
FloatType
IntegerType
ByteType
ShortType
DoubleType
LongType
BinaryType
BooleanType
DateType
DecimalType
TimestampType
DataType与Mysql数据类型的对应关系如下1
2
3
4
5
6
7
8
9
10
11
12
13
14
15field.dataType match {
case IntegerType => "INTEGER"
case LongType => "BIGINT"
case DoubleType => "DOUBLE PRECISION"
case FloatType => "REAL"
case ShortType => "INTEGER"
case ByteType => "BYTE"
case BooleanType => "BIT(1)"
case StringType => "TEXT"
case BinaryType => "BLOB"
case TimestampType => "TIMESTAMP"
case DateType => "DATE"
case t: DecimalType => s"DECIMAL(${t.precision},${t.scale})"
case _ => throw new IllegalArgumentException(s"Don't know how to save $field to JDBC")
})
注:其中String对应的是TEXT,当前没有Varchar类型,也没有长度。