Python에서 ElasticSearch를 사용하기 위해 elasticsearch7 패키지를 설치해야 한다.
pip install elasticsearch7
from elasticsearch7 import ElasticSearch
Elasticsearch 클래스 인스턴스가 드라이버나 클라이언트 역할을 하지만, 여러 가지 기능을 사용할 때마다 Elasticsearch 클래스 인스턴스 변수와 인덱스 이름을 전달해야 하는 번거로움이 있어서 ESClient라는 클래스로 wrapping하였다.
class ESClient:
def __init__(self, index_name):
self.es = Elasticsearch()
self.index_name = index_name
인덱스를 생성할 때 settings 값과 mappings 값을 전달해야 한다.
Elasticsearch 클래스 인스턴스인 es의 indices API의 create() 메소드를 이용하여 인덱스를 생성한다.
es.indices.create(index_name, settings={...}, mappings={...})
한글을 데이터로 사용할 예정이므로 nori_analyzer를 사용하겠다고 설정한다.
한글로 된 파일의 디렉토리 명, 이름, 확장자, 사이즈 등의 메타 데이터를 ElasticSearch에 적재하고, 이름과 확장자, 사이즈 등을 검색 필드로 사용할 예정이다. keyword 타입은 정확일치 검색, text는 형태소 분석에 기반한 검색이라고 할 수 있다. 특히, 이름 필드는 nori_analyzer를 사용하겠다고 선언한다.
ESClent 클래스의 create_index() 메소드를 작성한다.
def create_index(self):
print("Creating index ...")
settings = {
"index": {
"analysis": {
"analyzer": {
"nori_analyzer": {
"tokenizer": "nori_tokenizer"
}
}
}
}
}
mappings = {
"properties": {
"dir": {
"type": "keyword",
},
"name": {
"type": "text",
"analyzer": "nori_analyzer"
},
"ext": {
"type": "keyword"
},
"size": {
"type": "unsigned_long"
}
}
}
self.es.indices.create(index=self.index_name, settings=settings, mappings=mappings)
ElasticSearch에 저장할 수 있는 데이터는 JSON 포맷이기만 하면 내부 구조는 개발자가 원하는 대로 정의할 수 있다.
return {"32892": {"dir": "/data/project1/", "name": "file1", "ext": "txt", "size": 1024} }
아래 예제는 특정 TSV 파일에서 여러 파일들의 경로와 사이즈 정보를 읽어다가 디렉토리 명, 파일명, 확장자등을 분해하여 dictionary 타입의 변수로 만드는 것이다. 파일시스템에서 개별 파일의 unique ID로 사용할 수 있는 inode number를 key로 선택했다.
def read_data(tsv_file_path: Path) -> Dict[int, str]:
data = {}
with tsv_file_path.open("r", encoding="utf-8") as infile:
for line in infile:
line = line.strip()
if not line:
continue
inode_num, file_path_str, size = line.split("\t")
dir_path_str = Path(file_path_str).parent.name
name = Path(file_path_str).stem
ext = Path(file_path_str).suffix[1:]
data[int(inode_num)] = {"dir": dir_path_str, "name": name, "ext": ext, "size": int(size)}
return data
이 코드는 ElasticSearch 사용에 핵심적인 코드가 아니다. 개발자가 원하는 형태의 데이터를 미리 준비해놓는 단계가 필요할 뿐이다.
bulk API를 이용하여 적재한다.
es.bulk(body=es_data, request_timeout=60, refresh=True)
refresh 옵션을 설정하면 적재 후에 통계치를 갱신해준다. 적재 후 문서의 건수를 정확하게 카운트하려면 refresh가 필요하다. (적재 작업의 마지막에서 한 번만 해주는 게 좋다.)
ESClent 클래스의 load() 메소드를 작성한다.
def load(self, data: Dict[int, str]) -> None:
print("Loading data ...")
es_data = []
for inode_num, path_and_size in data.items():
es_data.append({"index": {"_index": self.index_name, "_id": inode_num}})
es_data.append(path_and_size)
print(len(es_data))
response = self.es.bulk(body=es_data, request_timeout=60, refresh=True)
검색은 search API를 사용한다. 검색 필드를 다루는 방법을 설정으로 전달해야 한다.
가장 단순한 검색 방법은 다음과 같이 검색 키워드를 전달하는 것이다.
query = {
'query': {
'match': {
'field_name': 'search_keyword'
}
}
}
es.search(index=index_name, query=query)
필드 별로 가중치를 주는 것을 부스팅이라고 하는데 다음과 같이 설정할 수 있다.
query = {
"bool": {
"should": [
{"match": {"name": {"query": keyword, "boost": 3}}},
{"match": {"ext": {"query": ext, "boost": 1}}},
{"match": {"size": {"query": size, "boost": 2}}}
]
}
}
response = es.search(index=self.index_name, query=query, scroll='10m')
검색 결과 JSON 데이터 중에서 hits 필드 부분이 중요하다.
{
'took': 10,
'timed_out': False,
'_shards': {
'total': 5,
'successful': 5,
'skipped': 0,
'failed': 0
},
'hits': {
'total': {
'value': 1,
'relation': 'eq'
},
'max_score': 1.0,
'hits': [
{
'_index': 'my_index',
'_type': 'my_doc',
'_id': '1',
'_score': 1.0,
'_source': {
'field_name': 'search_keyword',
'other_field': 'other_value'
}
}
]
}
}
response["hits"]["hits"] 값이 실제 검색 결과에 해당한다. 검색 결과는 키워드와 문서 사이의 일치여부를 TF-IDF 방식으로 계산하고 필드 별 가중치로 부스팅한 점수를 계산해서 그걸 기준으로 정렬한 것이다.
def search(self, keyword: str, ext: str = "", size: int = 0) -> None:
print(f"Searching with keyword '{keyword}' ...")
query = {
"bool": {
"should": [
{"match": {"name": {"query": keyword, "boost": 1.2 + math.log2(len(keyword.split(' ')))}}},
{"match": {"ext": {"query": ext, "boost": 1}}},
{"match": {"size": {"query": size, "boost": 1}}}
]
}
}
response = self.es.search(index=self.index_name, query=query, scroll='10m')
for hit in response['hits']['hits']:
print(hit)
scroll_id = response['_scroll_id']
scroll_size = response['hits']['total']['value']
while scroll_size > 0:
response = self.es.scroll(scroll_id=scroll_id, scroll='10m')
for hit in response['hits']['hits']:
print(hit)
scroll_id = response['_scroll_id']
scroll_size = len(response['hits']['hits'])
scroll 기능을 이용하여 pagination을 구현했다. 자세한 내용은 curl을_이용한_간단한_ElasticSearch_사용 문서를 참고하면 된다.
response["hits"]["max_score"]와 reponse["hits"]["hits"][0]["_score"]를 비교하여 스코어를 normalize할 수도 있다.
max_score = response['hits']['max_score']
for hit in response['hits']['hits']:
normalized_score = hit['_score'] * 100 / max_score
print(hit['_source'], normalized_score)
#!/usr/bin/env python
import sys
import math
from pathlib import Path
from typing import Dict, Any
from elasticsearch7 import Elasticsearch, RequestError
from pprint import pprint
def read_data(tsv_file_path: Path) -> Dict[int, str]:
data = {}
with tsv_file_path.open("r", encoding="utf-8") as infile:
for line in infile:
line = line.strip()
if not line:
continue
inode_num, file_path_str, size = line.split("\t")
dir_path_str = Path(file_path_str).parent.name
name = Path(file_path_str).stem
ext = Path(file_path_str).suffix[1:]
data[int(inode_num)] = {"dir": dir_path_str, "name": name, "ext": ext, "size": int(size)}
return data
class ESClient:
def __init__(self, index_name):
self.es = Elasticsearch()
self.index_name = index_name
def __del__(self):
del self.es
del self.index_name
def create_index(self):
print("Creating index ...")
settings = {
"index": {
"analysis": {
"analyzer": {
"nori_analyzer": {
"tokenizer": "nori_tokenizer"
}
}
}
}
}
mappings = {
"properties": {
"dir": {
"type": "keyword",
},
"name": {
"type": "text",
"analyzer": "nori_analyzer"
},
"ext": {
"type": "keyword"
},
"size": {
"type": "unsigned_long"
}
}
}
self.es.indices.create(index=self.index_name, settings=settings, mappings=mappings)
def load(self, data: Dict[int, str]) -> None:
print("Loading data ...")
es_data = []
for inode_num, path_and_size in data.items():
es_data.append({"index": {"_index": self.index_name, "_id": inode_num}})
es_data.append(path_and_size)
print(len(es_data))
response = self.es.bulk(body=es_data, request_timeout=60, refresh=True)
# print(response)
def search(self, keyword: str, ext: str = "", size: int = 0) -> None:
print(f"Searching with keyword '{keyword}' ...")
# query = {
# "match": {
# "name": keyword
# }
# }
query = {
"bool": {
"should": [
{"match": {"name": {"query": keyword, "boost": 1.2 + math.log2(len(keyword.split(' ')))}}},
{"match": {"ext": {"query": ext, "boost": 1}}},
{"match": {"size": {"query": size, "boost": 1}}}
]
}
}
response = self.es.search(index=self.index_name, query=query, scroll='10m')
total_hits = response['hits']['total']['value']
max_score = response['hits']['max_score']
for hit in response['hits']['hits']:
normalized_score = hit['_score'] * 100 / max_score
print(hit['_source'], normalized_score)
scroll_id = response['_scroll_id']
scroll_size = response['hits']['total']['value']
while scroll_size > 0:
response = self.es.scroll(scroll_id=scroll_id, scroll='10m')
total_hits = response['hits']['total']['value']
max_score = response['hits']['max_score']
for hit in response['hits']['hits']:
normalized_score = hit['_score'] * 100 / max_score
print(hit['_source'], normalized_score)
scroll_id = response['_scroll_id']
scroll_size = len(response['hits']['hits'])
def main() -> int:
tsv_file = sys.argv[1]
keyword = ""
if len(sys.argv) > 2:
keyword = sys.argv[2]
ext = ""
if len(sys.argv) > 3:
ext = sys.argv[3]
size = 0
if len(sys.argv) > 4:
size = sys.argv[4]
tsv_file_path = Path(tsv_file)
data = read_data(tsv_file_path)
es_client = ESClient("tm")
try:
es_client.create_index()
es_client.load(data)
except RequestError as e:
print(e)
es_client.search(keyword, ext, size)
return 0
if __name__ == "__main__":
sys.exit(main())
Python 스크립트는 다음과 같이 실행할 수 있고, 첫번째 아규먼트까지만 필수이다. 검색 필드로 사용하는 파일명이나 size는 생략 가능하다.
python load_and_search.py my_tsv_file.tsv 키워드 1024
한 번 생성되고 데이터가 적재된 인덱스는 반복적으로 생성/적재되지 않는다. 재적재가 필요하면 다음 명령을 실행해주자.
curl -X DELETE localost:9200/tm/