ElasticSearch 安装与Spark集成

安装篇

下载

1
wget https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz

本文使用当前最新版本(2.3.5),具体其他版本信息可以去官网查看。

安装

  • 解压文件

    1
    tar -zxvf elasticsearch-2.3.5.tar.gz
  • 执行启动

    1
    2
    3
    # 以非root用户执行
    ./bin/elasticsearch
    #如果后台启动就加上 -d
  • 测试

    1
    curl -X GET http://localhost:9200/

配置

修改配置文件(地址为config/elasticsearch.yml)

1
vi config/elasticsearch.yml

主要修改以下参数:

参数 解释
*cluster.name 集群的名字,一样即为同一集群 es_test(默认值是elasticsearch)
*node.name 集群节点的名字 spark1(每个机器这个配置不一样)
*network.host 集群的对外服务IP 192.068.0.180(对外服务的主机IP)
*http.port 集群的对外服务端口 9200(默认就是9200)
node.master 是否有资格被选为主节点 true
node.data 是否存储索引数据 true
index.number_of_shards 索引分片个数 5(默认值)
index.number_of_replicas 索引副本个数 1(默认值)
path.conf 配置文件的存储路径 默认是config文件夹
path.data 索引数据的存储路径 默认是es下data文件夹,可设置多路径,用逗号隔开
path.work 临时文件的存储路径 默认是es下的work文件夹
path.logs 日志文件的存储路径 默认是es下的logs文件夹
path.plugins 插件的存放路径 默认是es下的plugins文件夹
bootstrap.mlockall 设置为true来锁住内存 true
http.max_content_length 内容的最大容量 默认100MB
http.enabled 是否对外提供服务 true
gateway.type gateway的类型 默认为local即为本地文件系统,可以设置为本地文件系统,分布式文件系统,HDFS和amazon s3
gateway.recover_after_nodes 设置集群中N个节点启动时进行数据恢复 默认为1
gateway.recover_after_time 设置初始化数据恢复进程的超时时间 5m(默认是5分钟)
gateway.expected_nodes 设置这个集群中节点的数量 默认是2
cluster.routing.allocation.node_initial_primaries_recoveries 初始化数据恢复时,并发恢复线程的个数 默认4
cluster.routing.allocation.node_concurrent_recoveries 添加删除节点或负载均衡时并发恢复线程的个数 默认4
indices.recovery.max_size_per_sec 设置数据恢复时限制的带宽,如入100mb 默认0,无限制
indices.recovery.concurrent_streams 设置这个参数来限制从其它分片恢复数据时最大同时打开并发流的个数 默认5
discovery.zen.minimum_master_nodes 设置这个参数来保证集群中的节点可以知道其它N个有master资格的节点 默认1
discovery.zen.ping.timeout 设置集群中自动发现其它节点时ping连接超时时间 3s(默认3秒)
discovery.zen.ping.multicast.enabled 设置是否打开多播发现节点 默认true
discovery.zen.ping.unicast.hosts 设置集群中master节点的初始列表,可以通过这些节点来自动发现新加入集群的节点 [“host1”, “host2:port”, “host3[portX-portY]”]

带*号的是必需配置

集群

复制去其他机器,修改node.name,启动即可。

插件安装

安装插件使用bin/plugin命令
先看下plugin的命令有哪些

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#查看命令
./bin/plugin -h
#返回结果如下:
NAME
plugin - Manages plugins
SYNOPSIS
plugin <command>
DESCRIPTION
Manage plugins
COMMANDS
install Install a plugin
remove Remove a plugin
list List installed plugins
NOTES
[*] For usage help on specific commands please type "plugin <command> -h"

安装插件命令即为

1
2
# 安装kopf 后面的参数为github地址
./bin/plugin install lmenezes/elasticsearch-kopf

集成篇

参考官网下的介绍。
其中与spark的集成在这里

导入jar

1
2
3
4
5
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark_2.10</artifactId>
<version>2.3.2</version>
</dependency>

配置ES参数

1
2
3
4
5
6
7
8
9
10
11
12
13
import org.apache.spark.{SparkConf, SparkContext}
val conf = new SparkConf().setAppName(this.getClass.getSimpleName)

//自动创建index
conf.set("es.index.auto.create", "true")
// conf.set("es.nodes.wan.only", "true")
//对外服务地址
conf.set("es.nodes", "10.10.40.111")
//对外服务端口,默认是9200
conf.set("es.port","9222");
//非必须,本文需要用mid作为_id,故加上当前配置
//其他配置参考:https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html
conf.set("es.mapping.id","mid")

写入ES

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import org.elasticsearch.spark._

val sc = new SparkContext(conf)

val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")
val rdd = sc.makeRDD(Seq(numbers, airports))

//方法一
//saveToEs方法的参数即为:index/type
rdd.saveToEs("spark/docs")

//方法二
import org.elasticsearch.spark.rdd.EsSpark
EsSpark.saveToEs(rdd, "spark/docs")

读取ES

1
2
3
4
5
6
7
8
9
10
11
12
13
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

import org.elasticsearch.spark._

...

val conf = ...
val sc = new SparkContext(conf)

val RDD = sc.esRDD("radio/artists")
//带参数的
val rdd2 = sc.esRDD("radio/artists", "?q=me*")

其他的使用方式请参照官网介绍。
更多文档可以参考这里

热评文章