Created at 11am, Jan 11
wolzogSoftware Development
0
Kafka to Apache Spark with Upsert Mode to avoid duplicates
blqacbQrye7684t8eehRp4lOCUNWP-rIQ-wWbp0PxSs
File Type
DOCX
Entry Count
6
Embed. Model
jina_embeddings_v2_base_en
Index Type
hnsw

In databricks environment prepare a kafka to pyspark script with upsert mode

selectExpr("value.event", "value.messageid", "value.userid", "value.properties.productid", "value.context.source","timestamp") select_views_df = views_explode_df.select(col("event"), col("messageid"), col("userid"), col("productid"), col("source"), col("timestamp")).alias("v") return select_views_df ### Category Data #### def readCategoryData(self): category_df = spark.read \ .options(delimiter=",", header=True) \ .csv("/FileStore/resources/product_category_map.csv") return category_df def exprCategory(self, categoryDf): select_category = categoryDf.select("productid", "categoryid") return select_category ### View and Category JOIN ### def ViewJoinCategory(self, view_df, category_df): match_df = view_df.join(category_df.alias("c"), col("v.productid") == col("c.productid"), "inner") final_df = match_df.select("event", "messageid", "userid","v.productid", "categoryid", "source","timestamp") return final_df
id: 8e80dda120e3a1a2f736385ab4be2429 - page: 2
createOrReplaceTempView("views_df_temp_view") merge_statement = """MERGE INTO view_category s USING views_df_temp_view t ON s.productid == t.productid AND s.timestamp == t.timestamp WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * """ view_df._jdf.sparkSession().sql(merge_statement) def process(self): # View schema = self.getViewSchema() read_data_kafka = self.readViewData() get_view_data_kafka = self.getViewData(read_data_kafka,schema) df_view = self.explodeViewData(get_view_data_kafka) # Category read_data_category = self.readCategoryData() df_category = self.exprCategory(read_data_category) # Join join_df = self.ViewJoinCategory(df_view, df_category) sQuery = join_df.writeStream \ .queryName("ingest_view_category_data") \ .foreachBatch(self.upsert) \ .option("checkpointLocation", "/FileStore/resources/chekpoint/final_view_df") \ .outputMode("append") \ .start() pw = ProductView() pw.process()
id: a5ece5793afb0fe09194a18b599c902a - page: 2
Note: To integration with kafka you need to add spark-kafka maven packages. Version compatible is important here. Result:
id: 05ad354609ed329a2b60026eebc69671 - page: 3
How to Retrieve?
# Search

curl -X POST "https://search.dria.co/hnsw/search" \
-H "x-api-key: <YOUR_API_KEY>" \
-H "Content-Type: application/json" \
-d '{"rerank": true, "top_n": 10, "contract_id": "blqacbQrye7684t8eehRp4lOCUNWP-rIQ-wWbp0PxSs", "query": "What is alexanDRIA library?"}'
        
# Query

curl -X POST "https://search.dria.co/hnsw/query" \
-H "x-api-key: <YOUR_API_KEY>" \
-H "Content-Type: application/json" \
-d '{"vector": [0.123, 0.5236], "top_n": 10, "contract_id": "blqacbQrye7684t8eehRp4lOCUNWP-rIQ-wWbp0PxSs", "level": 2}'