CRUD(Create, Retrieve, Update, Delete)是数据库系统的四种基本操作,分别表示创建、查询、更改、删除,俗称“增删改查”。Elasticsearch作为NoSQL数据库(虽然ES是为搜索引擎而生的,但我更愿意将其看作带有强大文本搜索功能的NoSQL)。
以下示例基于Elasticsearch 2.4版本。
Create
在默认情况下,ES的REST接口的端口号为9200,对接Java client的端口号为9300。
Create操作为向index中索引文档,若index不存在则ES会自动创建;
$ curl -XPUT 'http://localhost:9200/twitter/tweet/1' -d '{}'
Java API("org.elasticsearch" % "elasticsearch" % "2.4.1")通过TransportClient
与ES集群连接,CRUD操作便是基于此而实现的。
final Settings settings = Settings.settingsBuilder() .put("client.transport.sniff", true) .put("client.transport.ping_timeout", 20, TimeUnit.SECONDS) .put("client", true) .put("data", false) .put("cluster.name", "") .build(); Client client = TransportClient.builder() .settings(settings).build() .addTransportAddresses( new InetSocketTransportAddress(InetAddress.getByName("host1"), 9300), new InetSocketTransportAddress(InetAddress.getByName("host2"), 9300));
创建index或索引document:
import org.elasticsearch.action.index.IndexResponse;IndexResponse response = client.prepareIndex("twitter", "tweet") .setSource(documentJson) .get();
Retrieve
ES的查询DSL大致可以分为两种:
- Query DSL,主要配合bool、match等使用,相当于SQL中的where子句;
- Aggregations,相当于SQL中的group by部分,细分为如下:
- Bucketing,聚合函数只能是
count(*)
,表示的是doc命中数,可以嵌套子aggs; - Metric,相比于Bucketing其非常灵活,可配合
avg
、max
、sum
等聚合函数,但是不能嵌套子aggs; - Pipeline,以其他aggs的结果作为输入,而不是直接在文档集合上进行操作。
ES的Query DSL功能实在是强大,在本文短短的篇幅中很难阐述完全,故只列举了两个简单实例。在以前的项目中,我使用过1.7
版本ES,后来发现2.0.0-beta1
版本及之后DSL语法发生很大的变化,比如filtered
、and
、or
等被废弃掉了,而被bool
取而代之;对应的Java API支持链式操作,与Java 8配合写起来非常舒服。
REST通过_search
接口进行DSL查询:
$ curl -XGET 'localhost:9200//_search?pretty' -d'{ }'
实战:List<List<String>> idsList
作为过滤条件,其中内一层为and关系、内二层为or关系;然后多字段(为bucketSizeMap
的key)aggs,Java 8实现:
BoolQueryBuilder mustQueryBuilder = boolQuery();if (!(idsList.size() == 1 && idsList.get(0).isEmpty())) { mustQueryBuilder = idsList.stream().reduce( boolQuery(), (mustQB, ids) -> { BoolQueryBuilder shouldQB = ids.stream().reduce(boolQuery(), (qb, id) -> qb.should(termQuery(SearchSystem.getEsType(id, idMap), id)), BoolQueryBuilder::should); return mustQB.must(shouldQB); }, BoolQueryBuilder::must);}SearchRequestBuilder searchRequestBuilder = client.prepareSearch(indexName) .setSearchType(SearchType.DFS_QUERY_THEN_FETCH) .setQuery(mustQueryBuilder);for (Map.Entryentry : bucketSizeMap.entrySet()) { AggregationBuilder aggregationBuilder = AggregationBuilders .terms(entry.getKey()) .field(entry.getKey()).size(entry.getValue()); searchRequestBuilder.addAggregation(aggregationBuilder);}SearchResponse response = searchRequestBuilder.execute().actionGet();
Bucket Aggregations支持filter aggs,即满足过滤条件后做aggs,
aggs:: filter: aggs:
其与filter query + aggs在功能上是等价的,
query: bool: filter:aggs:
但是,经测试发现filter query + aggs是比filter aggs查询要快。
Update
update为document级别的操作,即仅支持对某个具体document进行更新;REST通过_update
接口:
$ curl -XPOST 'localhost:9200/<_index>/<_type>/<_id>/_update' -d '{}'
Java API则有两种实现方式:UpdateRequest
+ update
与prepareUpdate
,
// case 1UpdateRequest updateRequest = new UpdateRequest();updateRequest.index("index");updateRequest.type("type");updateRequest.id("1");updateRequest.doc(jsonBuilder() .startObject() .field("gender", "male") .endObject());client.update(updateRequest).get();// case 2client.prepareUpdate("ttl", "doc", "1") .setDoc(jsonBuilder() .startObject() .field("gender", "male") .endObject()) .get();
Delete
delete操作通常都伴随着检查index是否存在(exist),exist的RESTful接口与Java API分别如下:
$ curl -XHEAD -i 'http://localhost:9200/twitter'
client.admin().indices() .prepareExists(indexName) .execute().actionGet().isExists();
ES提供了三种粗细粒度的删除操作:
- 删除整个index;
- 删除index中某一type;
- 删除特定的document.
RESTful接口:
-- delete complete index$ curl -XDELETE 'http://localhost:9200/'-- delete a type in index$ curl -XDELETE 'http://localhost:9200/ / '-- delete a particular document$ curl -XDELETE 'http://localhost:9200/ / /
Java API实现:
// delete complete indexclient.admin().indices().delete(new DeleteIndexRequest("")).actionGet();// delete a type in indexclient.prepareDelete().setIndex(" ").setType(" ").setId("*").execute().actionGet();// delete a particular documentclient.prepareDelete().setIndex(" ").setType(" ").setId(" ").execute().actionGet();// orDeleteResponse response = client.prepareDelete("twitter", "tweet", "1") .execute() .actionGet();