Spark SQL 读写MySQL

当前使用的SparkSQL版本为1.5.2

MySQL读取数据

两种方式去读取

  1. format
  2. jdbc

    方法一

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    val conf = new SparkConf().setAppName("SQL")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    val srcData = sqlContext.read.format("jdbc").options(Map(
    "url"->"jdbc:mysql://IP:PORT/DBNAME",
    "dbtable"->"TABLE_NAME",
    "driver"->"com.mysql.jdbc.Driver",
    "user"->"USER",
    "password"->"PWD")).load()
    val data = srcData.select("name","age").show()

注:将IP,PORT,DBNAME,TABLE_NAME,USER,PWD换成自己的

方法二

1
2
3
4
5
6
7
8
9
10
11
val conf = new SparkConf().setAppName("SQL")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

val properties = new Properties()
properties.setProperty("user","USER")
properties.setProperty("password","PWD")

val dataFrame = sqlContext.read.jdbc("jdbc:mysql://IP:PORT/DBNAME",
"TABLE_NAME",properties)
dataFrame.select("name","age").show()

注:将IP,PORT,DBNAME,TABLE_NAME,USER,PWD换成自己的

MySQL写入数据

本例采用RDD存入数据库的方式演示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
val conf = new SparkConf().setAppName("SQL")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
case class User(name:String,age:Int)

val properties = new Properties()
properties.setProperty("user","USER")
properties.setProperty("password","PWD")

val users = sc.parallelize(1 to 10).map(f=>User("张"+f,20+f)).map(f=>Row(f.name,f.age))

val schema = StructType(Array(StructField("name",StringType,true),
StructField("age",IntegerType,true)))
sqlContext.createDataFrame(users,schema).write.mode(SaveMode.Append)
.jdbc("jdbc:mysql://IP:PORT/DBNAME","TABLE_NAME",properties)

注:将IP,PORT,DBNAME,TABLE_NAME,USER,PWD换成自己的

mode选择

其中mode表示采用的是什么方式存表
SaveMode.Append:会将新的数据加在原来的数据后面
SaveMode.Overwrite:会删除原表数据
SaveMode.ErrorIfExists:会抛出 Table user already exists 异常,默认为此状态
SaveMode.Ignore:如果当前表有数据,新数据会被丢弃

StructType

StructType接受集合类型的StructField参数
StructField有以下四个字段

1
2
3
4
5
6
7
8
9
case class StructField(
//字段名
name: String,
//类型
dataType: DataType,
//是否允许为空
nullable: Boolean = true,
//元数据
metadata: Metadata = Metadata.empty)

DataType包含以下几类

1
2
3
4
5
6
7
8
9
10
11
12
StringType
FloatType
IntegerType
ByteType
ShortType
DoubleType
LongType
BinaryType
BooleanType
DateType
DecimalType
TimestampType

DataType与Mysql数据类型的对应关系如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
field.dataType match {
case IntegerType => "INTEGER"
case LongType => "BIGINT"
case DoubleType => "DOUBLE PRECISION"
case FloatType => "REAL"
case ShortType => "INTEGER"
case ByteType => "BYTE"
case BooleanType => "BIT(1)"
case StringType => "TEXT"
case BinaryType => "BLOB"
case TimestampType => "TIMESTAMP"
case DateType => "DATE"
case t: DecimalType => s"DECIMAL(${t.precision},${t.scale})"
case _ => throw new IllegalArgumentException(s"Don't know how to save $field to JDBC")
})

注:其中String对应的是TEXT,当前没有Varchar类型,也没有长度。

热评文章