基于Java对员工信息进行聚合分析

基于Java对员工信息进行聚合分析

API 语法:

1
2
3
4
5
6
7
8
9
10
SearchResponse sr = node.client().prepareSearch()
.addAggregation(
AggregationBuilders.terms("by_country").field("country")
.subAggregation(AggregationBuilders.dateHistogram("by_year")
.field("dateOfBirth")
.dateHistogramInterval(DateHistogramInterval.YEAR)
.subAggregation(AggregationBuilders.avg("avg_children").field("children"))
)
)
.execute().actionGet();

我们先给个需求:

(1)首先按照country国家来进行分组
(2)然后在每个country分组内,再按照入职年限进行分组
(3)最后计算每个分组内的平均薪资

PUT /company
{
“mappings”: {
​ “employee”: {
​ “properties”: {
​ “age”: {
​ “type”: “long”
​ },
​ “country”: {
​ “type”: “text”,
​ “fields”: {
​ “keyword”: {
​ “type”: “keyword”,
​ “ignore_above”: 256
​ }
​ },
​ “fielddata”: true
​ },
​ “join_date”: {
​ “type”: “date”
​ },
​ “name”: {
​ “type”: “text”,
​ “fields”: {
​ “keyword”: {
​ “type”: “keyword”,
​ “ignore_above”: 256
​ }
​ }
​ },
​ “position”: {
​ “type”: “text”,
​ “fields”: {
​ “keyword”: {
​ “type”: “keyword”,
​ “ignore_above”: 256
​ }
​ }
​ },
​ “salary”: {
​ “type”: “long”
​ }
​ }
​ }
​ }
}

GET /company/employee/_search
{
“size”: 0,
“aggs”: {
​ “group_by_country”: {
​ “terms”: {
​ “field”: “country”
​ },
​ “aggs”: {
​ “group_by_join_date”: {
​ “date_histogram”: {
​ “field”: “join_date”,
​ “interval”: “year”
​ },
​ “aggs”: {
​ “avg_salary”: {
​ “avg”: {
​ “field”: “salary”
​ }
​ }
​ }
​ }
​ }
​ }
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
	Map<String, Aggregation> aggrMap = searchResponse.getAggregations().asMap();
StringTerms groupByCountry = (StringTerms) aggrMap.get("group_by_country");
Iterator<Bucket> groupByCountryBucketIterator = groupByCountry.getBuckets().iterator();
while(groupByCountryBucketIterator.hasNext()) {
Bucket groupByCountryBucket = groupByCountryBucketIterator.next();

System.out.println(groupByCountryBucket.getKey() + "\t" + groupByCountryBucket.getDocCount());

Histogram groupByJoinDate = (Histogram) groupByCountryBucket.getAggregations().asMap().get("group_by_join_date");
Iterator<org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket> groupByJoinDateBucketIterator = groupByJoinDate.getBuckets().iterator();

while(groupByJoinDateBucketIterator.hasNext()) {
org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket groupByJoinDateBucket = groupByJoinDateBucketIterator.next();

System.out.println(groupByJoinDateBucket.getKey() + "\t" + groupByJoinDateBucket.getDocCount());

Avg avgSalary = (Avg) groupByJoinDateBucket.getAggregations().asMap().get("avg_salary");
System.out.println(avgSalary.getValue());
}
}

client.close();
}

代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import java.net.InetAddress;
import java.util.Iterator;
import java.util.Map;

import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;
import org.elasticsearch.search.aggregations.metrics.avg.Avg;
import org.elasticsearch.transport.client.PreBuiltTransportClient;

/**
* 员工聚合分析应用程序
* @author Administrator
*
*/
public class EmployeeAggrApp {

@SuppressWarnings({ "unchecked", "resource" })
public static void main(String[] args) throws Exception {
Settings settings = Settings.builder()
.put("cluster.name", "elasticsearch")
.build();

TransportClient client = new PreBuiltTransportClient(settings)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300));

SearchResponse searchResponse = client.prepareSearch("company")
.addAggregation(AggregationBuilders.terms("group_by_country").field("country")
.subAggregation(AggregationBuilders
.dateHistogram("group_by_join_date")
.field("join_date")
.dateHistogramInterval(DateHistogramInterval.YEAR)
.subAggregation(AggregationBuilders.avg("avg_salary").field("salary")))
)
.execute().actionGet();

Map<String, Aggregation> aggrMap = searchResponse.getAggregations().asMap();

StringTerms groupByCountry = (StringTerms) aggrMap.get("group_by_country");
Iterator<Bucket> groupByCountryBucketIterator = groupByCountry.getBuckets().iterator();
while(groupByCountryBucketIterator.hasNext()) {
Bucket groupByCountryBucket = groupByCountryBucketIterator.next();
System.out.println(groupByCountryBucket.getKey() + ":" + groupByCountryBucket.getDocCount());

Histogram groupByJoinDate = (Histogram) groupByCountryBucket.getAggregations().asMap().get("group_by_join_date");
Iterator<org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket> groupByJoinDateBucketIterator = groupByJoinDate.getBuckets().iterator();
while(groupByJoinDateBucketIterator.hasNext()) {
org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket groupByJoinDateBucket = groupByJoinDateBucketIterator.next();
System.out.println(groupByJoinDateBucket.getKey() + ":" +groupByJoinDateBucket.getDocCount());

Avg avg = (Avg) groupByJoinDateBucket.getAggregations().asMap().get("avg_salary");
System.out.println(avg.getValue());
}
}

client.close();
}

}
打赏
  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!
  • © 2020 John Doe
  • Powered by Hexo Theme Ayer
  • PV: UV:

请我喝杯咖啡吧~

支付宝
微信