Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MoleculerRetryableError: Adapter not yet connected. Skipping publishing #83

Open
4 tasks done
valeeum opened this issue Aug 26, 2024 · 3 comments
Open
4 tasks done

Comments

@valeeum
Copy link

valeeum commented Aug 26, 2024

Prerequisites

Please answer the following questions for yourself before submitting an issue.

  • I am running the latest version
  • I checked the documentation and found no answer
  • I checked to make sure that this issue has not already been filed
  • I'm reporting the issue to the correct repository

Current Behavior

When i call this.broker.sendToChannel() in my service started method, i get the following error:

MoleculerRetryableError: Adapter not yet connected. Skipping publishing

  started() {
    setTimeout(() => {
      try {
        this.broker.sendToChannel(
          `user.events`,
          { blah: 2 },
        );
      } catch (error) {
        this.logger.error({ error });
      }
    }, 1000);
  },

In the snippet above, waiting 1 second after started is called creates an error. When I wait 10 seconds, i dont get this error.

I am using the latest version of the Redis adapter.

Expected Behavior

I expect the broker to wait until the connection is made before starting services.

@FlavioF
Copy link

FlavioF commented Oct 17, 2024

I am experiencing the same behaviour. Did you come up with any solution @valeeum ?

@valeeum
Copy link
Author

valeeum commented Oct 17, 2024

@FlavioF nope, unfortunately not.

@alon-gb
Copy link

alon-gb commented Dec 4, 2024

It seems like an internal race condition. The middleware is calling the adapter's connect() async function without waiting for it to be returned. The NATS adapter (which is what I use, but it's similar to other adapters), internally, expects that the connect() function finishes successfully (internally setting the connected field to true) as a pre-condition.

I was able to monkey-patch it locally like this:

constructor(opts) {
//...
this.resetConnectionPromise();
}

resetConnectionPromise(){
		this.connectionPromise = new Promise((resolve, reject) => {
			this.resolveConnectionPromise = resolve;
			this.rejectConnectionPromise = reject;
		});
	}
 /**
	 * Connect to the adapter.
	 */
	async connect() {		
		try { 
			this.connection = await NATS.connect(this.opts.nats.connectionOptions);

			this.manager = await this.connection.jetstreamManager();

			this.client = this.connection.jetstream(); // JetStreamOptions
			
			this.connected = true;

			this.resolveConnectionPromise();
		} catch(err){ 
			this.logger.fatal("Error while connecting to NATS", err);
			this.rejectConnectionPromise(err);
		}
	}

	/**
	 * Disconnect from adapter
	 */
	async disconnect() {
		this.stopping = true;		

		try {
			if (this.connection) {
				this.logger.info("Closing NATS JetStream connection...");
				await this.connection.drain();
				await this.connection.close();

				this.logger.info("NATS JetStream connection closed.");
			}
		} catch (error) {
			this.logger.error("Error while closing NATS JetStream connection.", error);
		}

		this.connected = false;
		this.resetConnectionPromise();
	}

async publish(channelName, payload, opts = {}) {
		// Adapter is stopping. Publishing no longer is allowed
		if (this.stopping || !this.connectionPromise) return;

		await this.connectionPromise;
//rest of the publish method

This effectively queues all published messages until the connection process succeeds or fails.
I'll create a PR for this, but I can confirm that it's working as expected so far.

One important thing to note here is that the emit/sendToChannel() must not be called/be awaited for during the service "started" function, as the internal connect() function of the adapter is called at a later point. I had to wrap my code with setImmediate() and that made everything work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants