はじめに
CSVやPDF, Excel などの様々なデータソースを ElasticSearch に溜めて、指定の文言が含まれているファイルを検索できる簡易的なドキュメント検索Webアプリケーションを作成しました。
今回のソースコードは下記にプッシュしています。
機能、動作イメージ
ドキュメント検索機能
- 任意の文字列を入力して検索を押下
- 検索文字列が含まれるドキュメントがある場合、そのファイル名とファイル内のテキスト(200文字まで)を検索結果として表示する
ドキュメントアップロード機能
- 選択ボタンをクリックするとファイル選択ダイアログが立ち上がり、任意のドキュメントを選択
- 送信ボタンをクリックすると選択したドキュメントが ElasticSearch に登録される
※今回は、CSV, PDF, Excel ファイルいずれかのみ受け付ける形にしています
使用技術
- フロントエンド
- React v18.2.0
- TypeScript v4.9.5
- バックエンド
- Python v3.8.10
- FastAPI v0.110.0
- ElasticSearch v7.5.1
- Docker v20.10.17
実装詳細ーバックエンド
以下で ElasticSearch とのつなぎこみ箇所を中心にソースコードを添付しています。
ElasticSearch
ElasticSearch のDockerイメージから環作成しています。
docker-compose.yml
version: "3"
services:
sysctl:
image: alpine
container_name: sysctl
privileged: true
networks:
- esnet
es01:
build:
context: .
dockerfile: Dockerfile
container_name: es01
environment:
- node.name=es01
- cluster.initial_master_nodes=es01
- cluster.name=docker-cluster
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- esdata01:/usr/share/elasticsearch/data
- 9200:9200
networks:
- esnet
depends_on:
- sysctl
volumes:
esdata01:
driver: local
networks:
esnet:
- esdata01 というボリュームを作成してマウントしています
- ElasticSearch を起動後、メモリ割り当てが足りない旨のエラーが出てそのまま停止してしまう事象が発生したため、 vm.max_map_count=262144 でメモリ割り当てを増やして停止しないようにしています
Dockerfile
FROM docker.elastic.co/elasticsearch/elasticsearch:7.5.1
RUN elasticsearch-plugin install analysis-kuromoji
- ElasticSearch コンテナはこちらで指定しています
- 今回日本語も扱うため、kuromojiプラグイン を追加しています
バックエンド
リクエストハンドリング
@app.get("/search", response_model=SearchDocumentResponse)
def search(text: Optional[str] = Query(None, description="検索クエリー")):
return search_document(text)
@app.post("/upload")
async def upload(file: UploadFile = File(...)):
result = await upload_document(file)
return JSONResponse(content={"result": result})
- FastAPI を使い、「ドキュメント検索」と「ドキュメントアップロード」機能用のエンドポイントを定義しています
- 検索エンドポイントは検索文字列を受け取る必要があるため、text というクエリーパラメーターで受け取るようにしています
- ドキュメントアップロードで送信されたファイルデータは、UploadFile というクラスで受け取ることができます
ElasticSearch のドキュメント検索
import re
import requests
from models.search_document import SearchDocumentResponse, SearchDocumentResult
def search_document(text: str):
try:
# Elasticsearchへのクエリ実行
if is_alphanumeric(text):
# 半角英数字の場合
else:
# 日本語の場合
query = {"query": {"match_phrase": {"doc.text": text}}}
# ステータスコードが200以外の場合はエラーを出力して終了
if response.status_code != 200:
print(
f"Error: Failed to retrieve data from Elasticsearch. Status code: {response.status_code}"
)
return SearchDocumentResponse(count=0, result=)
# レスポンスからhitsを取得
# 検索結果を格納するリスト
search_results =
# レスポンスのhitsに対してループ処理
for hit in data:
file_name = hit["_source"]["doc"]["name"]
# textがリストか文字列かで処理を分岐
text_content = hit["_source"]["doc"]["text"]
if isinstance(text_content, list):
# textがリストの場合、各要素を連結して1つの文字列にする
text_content = "\n".join(text_content)
search_results.append(
SearchDocumentResult(file_name=file_name, text=text_content)
)
# 検索結果をSearchDocumentResponseに格納して返す
return SearchDocumentResponse(count=len(search_results), result=search_results)
except Exception as e:
print(
f"Error: An error occurred while processing the response from Elasticsearch: {str(e)}"
)
return SearchDocumentResponse(count=0, result=)
def is_alphanumeric(text):
pattern = re.compile(r"^[a-zA-Z0-9]*$")
return bool(pattern.match(text))
- 検索文字列を text 変数として受け取り、ElasticSearch への検索クエリーを構築します
- 日本語と英数字両方に対応している検索クエリーが見当たらかなったため、日本語と英数字で投げるクエリーを動的に変えています(いずれも部分一致検索で指定の文字列が含まれるか検索する)
- 複数件ヒットする可能性があるため、配列に検索結果を詰めて返却しています
ElasticSearch にドキュメント登録
async def upload_document(file: UploadFile = File(...)):
content_type = file.content_type
if content_type == FILE_TYPE_PDF:
await upload_pdf(file)
else:
print("not found")
pass
ElasticSearch にドキュメント登録(CSV)
import os
from fastapi import File, UploadFile
import requests
from constant.constant import (
ELASTIC_SEARCH_REQUEST_HEADERS,
ELASTIC_SEARCH_URL,
)
try:
file_name = file.filename
file_path = os.path.join(os.getcwd(), file_name)
# アップロードされたファイルを保存
with open(file_path, "wb") as f:
f.write(await file.read())
file_content =
reader = DictReader(file)
for row in reader:
# Elasticsearchに送信するデータを構築
data = {"doc": {"name": file_name, "text": file_content}}
print("Uploaded file data:", data)
response = requests.post(
ELASTIC_SEARCH_URL,
headers=ELASTIC_SEARCH_REQUEST_HEADERS,
)
if response.status_code == 201:
print("Document indexed successfully.")
else:
print(f"Failed to index document. Status code: {response.text}")
return False
# 一時ファイルを削除
os.unlink(file_path)
return True
except Exception as e:
print(
)
return False
- 引数の UploadFile からファイル名とファイル内容を読み込みます
- ファイル内容の読み込み部分は、いったん Temp ファイルに退避しておいて、Temp ファイルからファイル内容を文字列に読み込んでいます(直接 UploadFile から読み込んで ElasticSearch に送信したところエラーになったため。。)
- name と text というフィールドを持ったドキュメントを登録しています。(※事前に book というインデックスを作成しこれを使用しています)
ElasticSearch にドキュメント登録(Excel)
import os
from fastapi import File, UploadFile
import pandas as pd
import requests
from constant.constant import ELASTIC_SEARCH_REQUEST_HEADERS, ELASTIC_SEARCH_URL
try:
file_name = file.filename
file_path = os.path.join(os.getcwd(), file_name)
# アップロードされたファイルを保存
with open(file_path, "wb") as f:
f.write(await file.read())
# ブック全体の内容を入れるための空のリストを作成
book_content = []
# Excelファイルから各シートのデータを読み込んでリストに追加する
with pd.ExcelFile(file_path) as xls:
for sheet_name in xls.sheet_names:
records = df.to_dict(orient="records")
book_content.extend(records)
# Elasticsearchに送信するデータを構築
data = {
"doc": {
"name": file_name,
}
}
print("Uploaded file data:", data)
response = requests.post(
ELASTIC_SEARCH_URL,
headers=ELASTIC_SEARCH_REQUEST_HEADERS,
)
if response.status_code == 201:
print("Document indexed successfully.")
else:
print(f"Failed to index document. Status code: {response.text}")
return False
# 一時ファイルを削除
os.unlink(file_path)
return True
except Exception as e:
print(
)
return False
- CSVの処理同様に、ファイル名とファイル内容を読み込んでいます
- pandas を使いシートごとに読み込んだ内容を配列に読み込み、最終的に1つの文字列に変換して ElasticSearch に登録します
ElasticSearch にドキュメント登録(PDF)
import os
from tempfile import NamedTemporaryFile
from fastapi import File, UploadFile
from pdfminer.high_level import extract_text
import requests
from constant.constant import ELASTIC_SEARCH_REQUEST_HEADERS, ELASTIC_SEARCH_URL
async def upload_pdf(file: UploadFile = File(...)):
try:
file_name = file.filename
file_path = os.path.join(os.getcwd(), file_name)
# アップロードされたファイルを保存
with open(file_path, "wb") as f:
f.write(await file.read())
# PDFからテキストを抽出
file_text = extract_text(file_path)
# Elasticsearchに送信するデータを構築
data = {"doc": {"name": file_name, "text": file_text}}
print("Uploaded file data:", data)
response = requests.post(
ELASTIC_SEARCH_URL,
headers=ELASTIC_SEARCH_REQUEST_HEADERS,
)
if response.status_code == 201:
print("Document indexed successfully.")
else:
print(f"Failed to index document. Status code: {response.text}")
return False
# 一時ファイルを削除
os.unlink(file_path)
return True
except Exception as e:
print(
f"Error: An error occurred while processing uploading pdf to Elasticsearch: {str(e)}"
)
return False
- PDF の読み込みは pdfminer がシンプルに書けて使いやすかったので採用しました
- こちらもCSV, Excel の処理と同じ形式で、ファイル名とファイル内容を読み込んで ElasticSearch に登録します
以上のような実装で ElasticSearch へドキュメントの登録と検索を実現することができました。
※ elasticsearch · PyPI があり、これを最初試していたもののDocker起動している ElasticSearch にうまく接続ができず、今回は HTTP リクエストで登録する方式を取りました
おわりに
今回はCSV, Excel, PDF のみでしたが、他にもあらゆる形式のドキュメントを ElascticSearch に溜めこむことができると思いました。
これを応用して例えば、自組織のファイルを ElascticSearch にためていって、任意の文章で検索して目的のファイルを探す、みたいな使い方もできそうですね。