debeir.core.indexer
1import abc 2import threading 3from queue import Queue 4from typing import List 5 6from debeir.rankers.transformer_sent_encoder import Encoder 7from debeir.utils.utils import remove_excess_whitespace 8from elasticsearch import Elasticsearch 9 10 11class Indexer: 12 def __init__(self, client): 13 super().__init__() 14 self.client = client 15 16 @abc.abstractmethod 17 def get_field(self, document, field): 18 pass 19 20 21class SemanticElasticsearchIndexer(Indexer, threading.Thread): 22 """ 23 Create a NIR-style index, with dense field representations with provided sentence encoder 24 Assumes you've already indexed to start with. 25 """ 26 27 def __init__(self, es_client: Elasticsearch, encoder: Encoder, index: str, 28 fields_to_encode: List[str], queue: Queue): 29 super().__init__(es_client) 30 self.encoder = encoder 31 self.index = index 32 self.fields = fields_to_encode 33 self.q = queue 34 self.update_mappings(self.index, self.fields, self.client) 35 36 @classmethod 37 def update_mappings(self, index, fields, client: Elasticsearch): 38 mapping = {} 39 value = { 40 "type": "dense_vector", 41 "dims": 768 42 } 43 44 for field in fields: 45 mapping[field + "_Embedding"] = value 46 mapping[field + "_Text"] = {"type": "text"} 47 48 client.indices.put_mapping( 49 body={ 50 "properties": mapping 51 }, index=index) 52 53 # async def create_index(self, document_itr=None): 54 # await self._update_mappings() 55 56 # if document_itr is None: 57 # document_itr = helpers.async_scan(self.es_client, index=self.index) 58 59 # bar = tqdm(desc="Indexing", total=35_000) 60 61 # async for document in document_itr: 62 # doc = document["_source"] 63 # await self.index_document(doc) 64 65 # bar.update(1) 66 67 def get_field(self, document, field): 68 if field not in document: 69 return False 70 71 if "f{field}_Text" in document and document["f{field}_Text"] != 0: 72 return False 73 74 if 'Textblock' in document[field]: 75 return remove_excess_whitespace(document[field]['Textblock']) 76 77 return remove_excess_whitespace(document[field]) 78 79 def index_document(self, document): 80 update_doc = {} 81 doc = document["_source"] 82 83 for field in self.fields: 84 text_field = self.get_field(doc, field) 85 86 if text_field: 87 embedding = self.encoder.encode(topic=text_field, disable_cache=True) 88 update_doc[f"{field}_Embedding"] = embedding 89 update_doc[f"{field}_Text"] = text_field 90 91 if update_doc: 92 self.client.update(index=self.index, 93 id=document['_id'], 94 doc=update_doc) 95 96 def run(self): 97 while not self.q.empty(): 98 document = self.q.get() 99 self.index_document(document)
22class SemanticElasticsearchIndexer(Indexer, threading.Thread): 23 """ 24 Create a NIR-style index, with dense field representations with provided sentence encoder 25 Assumes you've already indexed to start with. 26 """ 27 28 def __init__(self, es_client: Elasticsearch, encoder: Encoder, index: str, 29 fields_to_encode: List[str], queue: Queue): 30 super().__init__(es_client) 31 self.encoder = encoder 32 self.index = index 33 self.fields = fields_to_encode 34 self.q = queue 35 self.update_mappings(self.index, self.fields, self.client) 36 37 @classmethod 38 def update_mappings(self, index, fields, client: Elasticsearch): 39 mapping = {} 40 value = { 41 "type": "dense_vector", 42 "dims": 768 43 } 44 45 for field in fields: 46 mapping[field + "_Embedding"] = value 47 mapping[field + "_Text"] = {"type": "text"} 48 49 client.indices.put_mapping( 50 body={ 51 "properties": mapping 52 }, index=index) 53 54 # async def create_index(self, document_itr=None): 55 # await self._update_mappings() 56 57 # if document_itr is None: 58 # document_itr = helpers.async_scan(self.es_client, index=self.index) 59 60 # bar = tqdm(desc="Indexing", total=35_000) 61 62 # async for document in document_itr: 63 # doc = document["_source"] 64 # await self.index_document(doc) 65 66 # bar.update(1) 67 68 def get_field(self, document, field): 69 if field not in document: 70 return False 71 72 if "f{field}_Text" in document and document["f{field}_Text"] != 0: 73 return False 74 75 if 'Textblock' in document[field]: 76 return remove_excess_whitespace(document[field]['Textblock']) 77 78 return remove_excess_whitespace(document[field]) 79 80 def index_document(self, document): 81 update_doc = {} 82 doc = document["_source"] 83 84 for field in self.fields: 85 text_field = self.get_field(doc, field) 86 87 if text_field: 88 embedding = self.encoder.encode(topic=text_field, disable_cache=True) 89 update_doc[f"{field}_Embedding"] = embedding 90 update_doc[f"{field}_Text"] = text_field 91 92 if update_doc: 93 self.client.update(index=self.index, 94 id=document['_id'], 95 doc=update_doc) 96 97 def run(self): 98 while not self.q.empty(): 99 document = self.q.get() 100 self.index_document(document)
Create a NIR-style index, with dense field representations with provided sentence encoder Assumes you've already indexed to start with.
28 def __init__(self, es_client: Elasticsearch, encoder: Encoder, index: str, 29 fields_to_encode: List[str], queue: Queue): 30 super().__init__(es_client) 31 self.encoder = encoder 32 self.index = index 33 self.fields = fields_to_encode 34 self.q = queue 35 self.update_mappings(self.index, self.fields, self.client)
This constructor should always be called with keyword arguments. Arguments are:
group should be None; reserved for future extension when a ThreadGroup class is implemented.
target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.
name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.
args is the argument tuple for the target invocation. Defaults to ().
kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.
If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.
37 @classmethod 38 def update_mappings(self, index, fields, client: Elasticsearch): 39 mapping = {} 40 value = { 41 "type": "dense_vector", 42 "dims": 768 43 } 44 45 for field in fields: 46 mapping[field + "_Embedding"] = value 47 mapping[field + "_Text"] = {"type": "text"} 48 49 client.indices.put_mapping( 50 body={ 51 "properties": mapping 52 }, index=index)
68 def get_field(self, document, field): 69 if field not in document: 70 return False 71 72 if "f{field}_Text" in document and document["f{field}_Text"] != 0: 73 return False 74 75 if 'Textblock' in document[field]: 76 return remove_excess_whitespace(document[field]['Textblock']) 77 78 return remove_excess_whitespace(document[field])
80 def index_document(self, document): 81 update_doc = {} 82 doc = document["_source"] 83 84 for field in self.fields: 85 text_field = self.get_field(doc, field) 86 87 if text_field: 88 embedding = self.encoder.encode(topic=text_field, disable_cache=True) 89 update_doc[f"{field}_Embedding"] = embedding 90 update_doc[f"{field}_Text"] = text_field 91 92 if update_doc: 93 self.client.update(index=self.index, 94 id=document['_id'], 95 doc=update_doc)
97 def run(self): 98 while not self.q.empty(): 99 document = self.q.get() 100 self.index_document(document)
Method representing the thread's activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
Inherited Members
- threading.Thread
- start
- join
- name
- ident
- is_alive
- daemon
- isDaemon
- setDaemon
- getName
- setName
- native_id