Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MQTT streaming source #5432

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

jules-ch
Copy link

@jules-ch jules-ch commented Sep 20, 2024

Description

This PR add MQTT as a streaming source

Implementation relies on paho-mqtt, batching reads is performed using a Queue to fill up messages and handling them with batch.

Related #3876

Checklist

  • The PR is tagged with proper labels (bug, enhancement, feature, documentation)
  • I have performed a self-review of my own code
  • I have added unit tests that prove my fix is effective or that my feature works
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation

@jules-ch jules-ch marked this pull request as draft September 20, 2024 19:41
- Add paho_mqtt as requirements
- Implement MQTT streaming source with MQTT V5 protocol
@jules-ch
Copy link
Author

Right now, the handler is Callable[[mqtt.MqttMessage], Any], we might change that to support Serde configuration to support protobuf and avro like kafka source does.

@jules-ch jules-ch marked this pull request as ready for review September 26, 2024 20:30
@jules-ch jules-ch changed the title Draft: Add MQTT streaming source Add MQTT streaming source Sep 26, 2024
@jules-ch
Copy link
Author

jules-ch commented Oct 3, 2024

Should be ready for review @wangxiaoyou1993

I think it might be a good idea to change handler to take a dict with the same keys as mqtt.MQTTMessage to handle parsing and decoding with future support for protobuf or avro.

@wangxiaoyou1993
Copy link
Member

Should be ready for review @wangxiaoyou1993

I think it might be a good idea to change handler to take a dict with the same keys as mqtt.MQTTMessage to handle parsing and decoding with future support for protobuf or avro.

sorry for the delay. will review it soon.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants