-
Notifications
You must be signed in to change notification settings - Fork 102
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CamelSalesforcesourceSourceConnector: Does not return Id/sObjectId from Salesforce #1592
Comments
The headers coming from camel are prefixed as CamelHeader.<header_name> and there should me a list of record IDs. Can you provide the content and Kafka headers of one of the record you got? |
@oscerd I believe I have found the issue/workaround. I will paste images here rather to make things more readable. FYI, I am using redpanda console to make life easier. Using v3.18.2Using the connector config described in the problem above. I get the following info in my Kafka - Value:Header:Synopsis:As you can see RecordId does not exist in the value or the header. Please note the "Account Name" I provided in Salesforce was "Using 3.18.2 without rawPayload" - I will come back to this. Using 3.18.2 with rawPayload=trueI tried setting Using 0.11.5I decided to use this version to avoid kamelet altogether. Connector Config:Please note the above image has Value (rawPayload=false):Value (rawPayload=true):The header is the same as before. But as you can see recordIds come with the value payload when rawPayload is set to true. Please also note I had to use the Workaround in v3.18.2There is no way I could find to set
NOTE: The By including this property via the topicName, I was able to receive the value as below. Value (enforced rawPayload=true)Other Issues with v3.18.2 WorkaroundThe final workaround does not give a proper json rather a string with I also tried using SMT as follows, as mentioned by you in another github issue related to telegram I believe - Unfortunately, this converts the value payload to a JWT style, which then needs to be decoded. SUMMARYIf developers want to use v3.18.2 the above workaround will do the job as intended but not ideal i.e. not clean. Otherwise we need to revert to v0.11.5 for now. Overall, I believe Also, we haven't tested salesforce as sink with v3.18.2 yet. Which might reveal other issues. Currently, I need to discuss with my team on testing sink versions for smooth sailing before finalizing the actual connector versions. I'll try to keep you posted. Fast TurnaroundI might be talking out of my arse here, but I think the fastest solution for the newer versions of kafka-connector for salesforce using kamelet is to allow developers to create the endpoint uri directly in the config class and remove all defaults from the properties. Having the option to use properties and/or directly creating the endpoint uri would be flexible. |
We don't have the capacity to follow the development of Kamelets and at the same time generating the connectors as we have done in the past. We need to find a tradeoff and the tradeoff is using Kamelets as building block for Kafka connectors. We could add more options to the Kamelets, like I'll do for rawPayload. If you want to use the component fully and with all the options, my suggestion is using Camel plain with Camel-Salesforce and Camel-Kafka component and avoid using the Kafka connector from Camel-Kafka-Connector. We're not going to change Kamelets as building block in the short term and probably in the long term too. |
And as always you're welcome to help on the Kamelets side or also by continuing reporting issues. Thanks for all your findings! |
@oscerd That makes complete sense. Honestly I would love to help out, figuring out the workaround and being a detective is fun but I have 0 java knowledge. But considering what you have mentioned regarding the way forward with kafka connector, I will talk to my company on developing a testing framework for salesforce connector - this won't be in java unfortunately as none of us in the team use it. Rather via simple setup & scripts like I am doing now. I will share those methodology & scripts. Whenever new versions come out and as long as I am working on this project, I will ensure to test using those scripts and post results and findings on that thread. So far I have the following -
I will be moving forward with testing salesforce sink connectors, will update you with my findings. |
Hey @apoorvmintri , How did
not real json. |
@aonamrata I downloaded the jar file from https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-salesforce-kafka-connector/0.11.5/ Note this is v0.11.5 And then used Kafka Connects REST API to create my connector -
Now I'd to add that we have moved away from camel connectors cause we faced lots of issues with the salesforce sink connector with 0.11.5, 3.18.2 & 4.0.0. We just couldnt make it work and decided to build our own set of producers and consumers with python. The salesforce CDC was achieved with python based cometd client - https://pypi.org/project/aiosfstream/ You'll have to fork this project and update it if you plan to use it with py 3.10 or greater. So if you are using 3.7.x to 3.9.x then you should be fine. not sure if it'll work with 3.6.x or lower. |
Connector: v3.18.2
Issue:
The salesforce source kafka connector does not contain the salesforce recordId/sObjectId on any event - Create, Update etc. Its not in the message value nor in the header. So unable to relate the update to the particular record that generated the update event. It only contains the fields, last modified date etc. Am I missing something?
Connector Config:
{ "name": "sf-source-connector", "config": { "tasks.max":"1", "connector.class":"org.apache.camel.kafkaconnector.salesforcesource.CamelSalesforcesourceSourceConnector", "key.converter":"org.apache.kafka.connect.storage.StringConverter", "value.converter":"org.apache.kafka.connect.converters.ByteArrayConverter", "camel.kamelet.salesforce-source.notifyForFields": "ALL", "camel.main.streamCachingEnabled": "false", "camel.idempotency.kafka.bootstrap.servers": "uds-kafka:9092", "camel.kamelet.salesforce-source.query": "SELECT * FROM Account", "camel.kamelet.salesforce-source.topicName": "subscribe:/data/AccountChangeEvent", "camel.kamelet.salesforce-source.loginUrl": "https://login.salesforce.com/", "camel.kamelet.salesforce-source.clientId": "<Redacted>", "camel.kamelet.salesforce-source.clientSecret": "<Redacted>", "camel.kamelet.salesforce-source.userName": "<Redacted>", "camel.kamelet.salesforce-source.password": "<Redacted>", "camel.kamelet.salesforce-source.notifyForOperationCreate": "true", "camel.kamelet.salesforce-source.notifyForOperationUpdate": "true", "camel.kamelet.salesforce-source.notifyForOperationDelete": "true", "camel.kamelet.salesforce-source.notifyForOperationUndelete": "true", "topics": "camelsfstream" } }
The text was updated successfully, but these errors were encountered: