import requests
def fetch_data(url):
response = requests.get(url)
return response.json()
data = fetch_data("http://example.com/department-data")
print(data)
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
t_env.execute_sql("""
CREATE TABLE sensor_data (
id STRING,
temperature DOUBLE,
timestamp TIMESTAMP(3),
WATERMARK FOR timestamp AS timestamp - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'sensor-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
)
""")