diff --git a/deepdoc/parser/pdf_parser.py b/deepdoc/parser/pdf_parser.py index 2f70ba7f10..331d5da172 100644 --- a/deepdoc/parser/pdf_parser.py +++ b/deepdoc/parser/pdf_parser.py @@ -757,7 +757,7 @@ def cropout(bxs, ltype, poss): if ii is not None: b = louts[ii] else: - logging.warn( + logging.warning( f"Missing layout match: {pn + 1},%s" % (bxs[0].get( "layoutno", ""))) diff --git a/rag/nlp/synonym.py b/rag/nlp/synonym.py index 1a9a3f533f..ba9f0a04a8 100644 --- a/rag/nlp/synonym.py +++ b/rag/nlp/synonym.py @@ -33,7 +33,7 @@ def __init__(self, redis=None): try: self.dictionary = json.load(open(path, 'r')) except Exception: - logging.warn("Missing synonym.json") + logging.warning("Missing synonym.json") self.dictionary = {} if not redis: diff --git a/rag/utils/es_conn.py b/rag/utils/es_conn.py index 503bcb43de..c7c66513ce 100644 --- a/rag/utils/es_conn.py +++ b/rag/utils/es_conn.py @@ -35,7 +35,7 @@ def __init__(self): self.info = self.es.info() break except Exception as e: - logging.warn(f"{str(e)}. Waiting Elasticsearch {settings.ES['hosts']} to be healthy.") + logging.warning(f"{str(e)}. Waiting Elasticsearch {settings.ES['hosts']} to be healthy.") time.sleep(5) if not self.es.ping(): msg = f"Elasticsearch {settings.ES['hosts']} didn't become healthy in 120s." @@ -80,7 +80,7 @@ def createIdx(self, indexName: str, knowledgebaseId: str, vectorSize: int): settings=self.mapping["settings"], mappings=self.mapping["mappings"]) except Exception: - logging.exception("ES create index error %s" % (indexName)) + logging.exception("ESConnection.createIndex error %s" % (indexName)) def deleteIdx(self, indexName: str, knowledgebaseId: str): try: @@ -88,7 +88,7 @@ def deleteIdx(self, indexName: str, knowledgebaseId: str): except NotFoundError: pass except Exception: - logging.exception("ES delete index error %s" % (indexName)) + logging.exception("ESConnection.deleteIdx error %s" % (indexName)) def indexExist(self, indexName: str, knowledgebaseId: str) -> bool: s = Index(indexName, self.es) @@ -96,7 +96,7 @@ def indexExist(self, indexName: str, knowledgebaseId: str) -> bool: try: return s.exists() except Exception as e: - logging.exception("ES indexExist") + logging.exception("ESConnection.indexExist got exception") if str(e).find("Timeout") > 0 or str(e).find("Conflict") > 0: continue return False @@ -115,8 +115,21 @@ def search(self, selectFields: list[str], highlightFields: list[str], condition: indexNames = indexNames.split(",") assert isinstance(indexNames, list) and len(indexNames) > 0 assert "_id" not in condition + + bqry = Q("bool", must=[]) + condition["kb_id"] = knowledgebaseIds + for k, v in condition.items(): + if not isinstance(k, str) or not v: + continue + if isinstance(v, list): + bqry.filter.append(Q("terms", **{k: v})) + elif isinstance(v, str) or isinstance(v, int): + bqry.filter.append(Q("term", **{k: v})) + else: + raise Exception( + f"Condition `{str(k)}={str(v)}` value type is {str(type(v))}, expected to be int, str or list.") + s = Search() - bqry = None vector_similarity_weight = 0.5 for m in matchExprs: if isinstance(m, FusionExpr) and m.method == "weighted_sum" and "weights" in m.fusion_params: @@ -130,13 +143,12 @@ def search(self, selectFields: list[str], highlightFields: list[str], condition: minimum_should_match = "0%" if "minimum_should_match" in m.extra_options: minimum_should_match = str(int(m.extra_options["minimum_should_match"] * 100)) + "%" - bqry = Q("bool", - must=Q("query_string", fields=m.fields, + bqry.must.append(Q("query_string", fields=m.fields, type="best_fields", query=m.matching_text, minimum_should_match=minimum_should_match, - boost=1), - boost=1.0 - vector_similarity_weight, - ) + boost=1)) + bqry.boost = 1.0 - vector_similarity_weight + elif isinstance(m, MatchDenseExpr): assert (bqry is not None) similarity = 0.0 @@ -150,21 +162,6 @@ def search(self, selectFields: list[str], highlightFields: list[str], condition: similarity=similarity, ) - condition["kb_id"] = knowledgebaseIds - if condition: - if not bqry: - bqry = Q("bool", must=[]) - for k, v in condition.items(): - if not isinstance(k, str) or not v: - continue - if isinstance(v, list): - bqry.filter.append(Q("terms", **{k: v})) - elif isinstance(v, str) or isinstance(v, int): - bqry.filter.append(Q("term", **{k: v})) - else: - raise Exception( - f"Condition `{str(k)}={str(v)}` value type is {str(type(v))}, expected to be int, str or list.") - if bqry: s = s.query(bqry) for field in highlightFields: @@ -181,8 +178,7 @@ def search(self, selectFields: list[str], highlightFields: list[str], condition: if limit > 0: s = s[offset:limit] q = s.to_dict() - print(json.dumps(q), flush=True) - logging.debug("ESConnection.search [Q]: " + json.dumps(q)) + logging.debug(f"ESConnection.search {str(indexNames)} query: " + json.dumps(q)) for i in range(3): try: @@ -194,15 +190,15 @@ def search(self, selectFields: list[str], highlightFields: list[str], condition: _source=True) if str(res.get("timed_out", "")).lower() == "true": raise Exception("Es Timeout.") - logging.debug("ESConnection.search res: " + str(res)) + logging.debug(f"ESConnection.search {str(indexNames)} res: " + str(res)) return res except Exception as e: - logging.exception("ES search [Q]: " + str(q)) + logging.exception(f"ESConnection.search {str(indexNames)} query: " + str(q)) if str(e).find("Timeout") > 0: continue raise e - logging.error("ES search timeout for 3 times!") - raise Exception("ES search timeout.") + logging.error("ESConnection.search timeout for 3 times!") + raise Exception("ESConnection.search timeout.") def get(self, chunkId: str, indexName: str, knowledgebaseIds: list[str]) -> dict | None: for i in range(3): @@ -217,12 +213,12 @@ def get(self, chunkId: str, indexName: str, knowledgebaseIds: list[str]) -> dict chunk["id"] = chunkId return chunk except Exception as e: - logging.exception(f"ES get({chunkId}) got exception") + logging.exception(f"ESConnection.get({chunkId}) got exception") if str(e).find("Timeout") > 0: continue raise e - logging.error("ES search timeout for 3 times!") - raise Exception("ES search timeout.") + logging.error("ESConnection.get timeout for 3 times!") + raise Exception("ESConnection.get timeout.") def insert(self, documents: list[dict], indexName: str, knowledgebaseId: str) -> list[str]: # Refers to https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html @@ -250,7 +246,7 @@ def insert(self, documents: list[dict], indexName: str, knowledgebaseId: str) -> res.append(str(item[action]["_id"]) + ":" + str(item[action]["error"])) return res except Exception as e: - logging.warning("Fail to bulk: " + str(e)) + logging.warning("ESConnection.insert got exception: " + str(e)) if re.search(r"(Timeout|time out)", str(e), re.IGNORECASE): time.sleep(3) continue @@ -268,7 +264,7 @@ def update(self, condition: dict, newValue: dict, indexName: str, knowledgebaseI return True except Exception as e: logging.exception( - f"ES failed to update(index={indexName}, id={id}, doc={json.dumps(condition, ensure_ascii=False)})") + f"ESConnection.update(index={indexName}, id={id}, doc={json.dumps(condition, ensure_ascii=False)}) got exception") if str(e).find("Timeout") > 0: continue else: @@ -307,7 +303,7 @@ def update(self, condition: dict, newValue: dict, indexName: str, knowledgebaseI _ = ubq.execute() return True except Exception as e: - logging.error("ES update exception: " + str(e) + "[Q]:" + str(bqry.to_dict())) + logging.error("ESConnection.update got exception: " + str(e)) if str(e).find("Timeout") > 0 or str(e).find("Conflict") > 0: continue return False @@ -329,7 +325,7 @@ def delete(self, condition: dict, indexName: str, knowledgebaseId: str) -> int: qry.must.append(Q("term", **{k: v})) else: raise Exception("Condition value must be int, str or list.") - logging.debug("ESConnection.delete [Q]: " + json.dumps(qry.to_dict())) + logging.debug("ESConnection.delete query: " + json.dumps(qry.to_dict())) for _ in range(10): try: res = self.es.delete_by_query( @@ -338,7 +334,7 @@ def delete(self, condition: dict, indexName: str, knowledgebaseId: str) -> int: refresh=True) return res["deleted"] except Exception as e: - logging.warning("Fail to delete: " + str(filter) + str(e)) + logging.warning("ESConnection.delete got exception: " + str(e)) if re.search(r"(Timeout|time out)", str(e), re.IGNORECASE): time.sleep(3) continue @@ -447,10 +443,10 @@ def sql(self, sql: str, fetch_size: int, format: str): request_timeout="2s") return res except ConnectionTimeout: - logging.exception("ESConnection.sql timeout [Q]: " + sql) + logging.exception("ESConnection.sql timeout") continue except Exception: - logging.exception("ESConnection.sql got exception [Q]: " + sql) + logging.exception("ESConnection.sql got exception") return None logging.error("ESConnection.sql timeout for 3 times!") return None