Delta Live Tables with EventHub
Asked Answered
I

2

5

I am trying to create streaming from eventhub using delta live tables, but I am having trouble installing the library . Is it possible to install maven library using Delta Live tables using sh /pip?

I would like to install com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.17

https://learn.microsoft.com/pl-pl/azure/databricks/spark/latest/structured-streaming/streaming-event-hubs

Isoclinal answered 24/5, 2022 at 9:59 Comment(1)
Currently clusters init scripts are not supported in delta live tables framework, I've add this comment to prevent other users from trying to solve this issue using it.Isoclinal
E
7

Right now it's not possible to use external connectors/Java libraries for Delta Live Tables. But for EventHubs there is a workaround - you can connect to EventHubs using the built-in Kafka connector - you just need to specify correct options as it's described in the documentation:

@dlt.table
def eventhubs():
  readConnectionString="Endpoint=sb://<....>.windows.net/;?.."
  eh_sasl = f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{readConnectionString}";'
  kafka_options = {
     "kafka.bootstrap.servers": "<eh-ns-name>.servicebus.windows.net:9093",
     "kafka.sasl.mechanism": "PLAIN",
     "kafka.security.protocol": "SASL_SSL",
     "kafka.request.timeout.ms": "60000",
     "kafka.session.timeout.ms": "30000",
     "startingOffsets": "earliest",
     "kafka.sasl.jaas.config": eh_sasl,
     "subscribe": "<topic-name>",
  }
  return spark.readStream.format("kafka") \ 
    .options(**kafka_options).load()
Embassy answered 25/5, 2022 at 14:57 Comment(0)
U
1

Setting up DLT pipeline using Azure event hub as source. Python for bronze table that reads from event hub. Sql for silver and gold tables:

In Event hub I am sending following json: Make sure its not a list.

{
    "id": "2",
    "name": "xyz1"
}

Refer Databricks Docs for how to setup bronze layer in python.

Python notebook

import dlt
import pyspark.sql.types as T
from pyspark.sql.functions import *

# Event Hubs configuration
EH_NAMESPACE                    = "xyz-eventhub1"
EH_NAME                         = "abc"

EH_CONN_SHARED_ACCESS_KEY_NAME  = "RootManageSharedAccessKey"
# SECRET_SCOPE                    = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = "xyz="

EH_CONN_STR                     = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"

# Kafka Consumer configuration

KAFKA_OPTIONS = {
  "kafka.bootstrap.servers"  : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
  "subscribe"                : EH_NAME,
  "kafka.sasl.mechanism"     : "PLAIN",
  "kafka.security.protocol"  : "SASL_SSL",
  "kafka.sasl.jaas.config"   : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
  "kafka.request.timeout.ms" : "60000",
  "kafka.session.timeout.ms" : "60000",
#   "maxOffsetsPerTrigger"     : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
  "failOnDataLoss"           : "false",
  "startingOffsets"          : "earliest"
}

# PAYLOAD SCHEMA
payload_ddl = """id STRING, name STRING"""
payload_schema = T._parse_datatype_string(payload_ddl)

# Basic record parsing and adding ETL audit columns
def parse(df):
  print(df)
  return (df
    .withColumn("records", col("value").cast("string"))
    .withColumn("parsed_records", from_json(col("records"), payload_schema))
    # .withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
    .withColumn("id", expr("parsed_records.id"))
    .withColumn("name", expr("parsed_records.name"))
    .withColumn("eh_enqueued_timestamp", expr("timestamp")) # when event was enqueued
    .withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
    .withColumn("bronze_timestamp", col("current_timestamp"))
    .withColumn("bronze_uuid", expr("uuid()"))
    .drop("records", "value", "key")
  )

@dlt.create_table(
  comment="Raw events from kafka",
  table_properties={
    "quality": "bronze",
    "pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
  },
  partition_cols=["eh_enqueued_date"]
)
@dlt.expect("valid_topic", "topic IS NOT NULL")
@dlt.expect("valid records", "parsed_records IS NOT NULL")
def kafka_bronze():
  return (
   spark.readStream
    .format("kafka")
    .options(**KAFKA_OPTIONS)
    .load()
    .transform(parse)
  )

Sql notebook

Silver table

CREATE STREAMING LIVE TABLE kafka_cleaned(
  CONSTRAINT id_not_null EXPECT (id IS NOT NULL)
)
COMMENT "Cleaned kafka table"
TBLPROPERTIES ("companyPipeline.quality" = "silver")
AS
SELECT 
cast(id as int) as id,
name,
eh_enqueued_timestamp,
bronze_timestamp,
CURRENT_TIMESTAMP as silver_timestamp
FROM STREAM(LIVE.kafka_bronze) 

Gold table

CREATE STREAMING LIVE TABLE kafka_gold
COMMENT "count of id per name"
TBLPROPERTIES ("companyPipeline.quality" = "gold")
AS
SELECT count(id) as count_id,
name,
CURRENT_TIMESTAMP as gold_timestamp 
FROM STREAM(LIVE.kafka_cleaned)
group by name

Use both these notebooks as source in DLT pipeline

Uriia answered 21/8, 2023 at 21:38 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.