面向对象
使用inject库来实现依赖注入
import boto3
from aws_requests_auth.aws_auth import AWSRequestsAuth
from elasticsearch import Elasticsearch, RequestsHttpConnection
import inject
def init_es(region: str, domain: str) -> Elasticsearch:
_credentials = boto3.Session().get_credentials()
_awsauth = AWSRequestsAuth(
aws_region=region,
aws_service="es",
aws_access_key=_credentials.access_key,
aws_secret_access_key=_credentials.secret_key,
aws_token=_credentials.token,
aws_host=domain
)
return Elasticsearch(
host=domain,
http_auth=_awsauth,
port=443,
use_ssl=True,
connection_class=RequestsHttpConnection,
verify_certs=False
)
class LogES(Elasticsearch):
"""
Why using the empty class?
if there was multiple ES connection, each connection will use one empty class,
by using this way to confirm the ES connection can be only init once.
"""
pass
@inject.configure
def config(binder: inject.Binder):
es_init = init_es(domain="abc.es.amazonaws.com", region="ap-northeast-1")
binder.bind(LogES, es_init)
Last updated