Skip to content
GitLab
Projects Groups Snippets
  • /
  • Help
    • Help
    • Support
    • Community forum
    • Submit feedback
    • Contribute to GitLab
  • Sign in / Register
  • B bull
  • Project information
    • Project information
    • Activity
    • Labels
    • Members
  • Repository
    • Repository
    • Files
    • Commits
    • Branches
    • Tags
    • Contributors
    • Graph
    • Compare
  • Issues 175
    • Issues 175
    • List
    • Boards
    • Service Desk
    • Milestones
  • Merge requests 9
    • Merge requests 9
  • CI/CD
    • CI/CD
    • Pipelines
    • Jobs
    • Schedules
  • Deployments
    • Deployments
    • Environments
    • Releases
  • Packages and registries
    • Packages and registries
    • Package Registry
    • Infrastructure Registry
  • Monitor
    • Monitor
    • Incidents
  • Analytics
    • Analytics
    • Value stream
    • CI/CD
    • Repository
  • Wiki
    • Wiki
  • Snippets
    • Snippets
  • Activity
  • Graph
  • Create a new issue
  • Jobs
  • Commits
  • Issue Boards
Collapse sidebar
  • OptimalBits
  • bull
  • Issues
  • #1746
Closed
Open
Issue created May 23, 2020 by Administrator@rootContributor

Closing shared redis connection causes infinite loop and crash [BUG]

Created by: dobesv

Description

If you share an ioredis instance between queues, then disconnect the redis client, the processing loop uses a lot of memory and CPU and spins forever.

I believe the issue occurs because of the way errors are "swallowed" in this function:

Queue.prototype.getNextJob = function() {
  if (this.closing) {
    return Promise.resolve();
  }

  if (this.drained) {
    //
    // Waiting for new jobs to arrive
    //
    return this.bclient
      .brpoplpush(this.keys.wait, this.keys.active, this.settings.drainDelay)
      .then(
        jobId => {
          if (jobId) {
            return this.moveToActive(jobId);
          }
        },
        err => {
          // Swallow error
          if (err.message !== 'Connection is closed.') {
            console.error('BRPOPLPUSH', err);
          }
        }
      );
  } else {
    return this.moveToActive();
  }
};

Probably what needs to happen is that the queue should go into some sort of backoff loop waiting for either (1) this.closing === true or (2) things start working again

A possible fix might look like this:

Queue.prototype.getNextJob = function () {
  if (this.closing) {
    return Promise.resolve();
  }

  if (this.drained) {
    if (this.getNextJobBackoff) {
      console.log('getNextJob() in backoff...');
      return this.getNextJobBackoff;
    }
    //
    // Waiting for new jobs to arrive
    //
    return this.bclient
      .brpoplpush(this.keys.wait, this.keys.active, this.settings.drainDelay)
      .then(
        (jobId) => {
          if (jobId) {
            return this.moveToActive(jobId);
          }
        },
        (err) => {
          // Add a delay before the a next attempt
          this.getNextJobBackoff = promisify(setTimeout)(
            1000
          ).then(() => {
            this.getNextJobBackoff = null;
          });
          // Swallow 'Connection is closed.' error
          if (err.message !== 'Connection is closed.') {
            throw err;
          }
        }
      );
  } else {
    return this.moveToActive();
  }
};

Note that this method handles the closing case immediately because we do not delay the immediate return / error, we only delay if we come back into getNextJob with this.closing unset.

Minimal, Working Test code to reproduce the issue.

// reproduce.js
const Queue = require('bull');
const Redis = require('ioredis');
const Bluebird = require('bluebird');

const main = async () => {
  const clients = {};
  const createClient = (type, redisOpts) => {
    const redisOptions = {
      ...redisOpts,
      connectionName: ['bull', type].join('-'),
    };
    const k = JSON.stringify(redisOptions);
    return clients[k] || (clients[k] = new Redis(redisOptions));
  };
  const q1 = new Queue('1', { createClient });
  const q2 = new Queue('2', { createClient });
  const q3 = new Queue('3', { createClient });
  q1.process(1, job => console.log('q1', job.data)).catch(err =>
    console.error(err),
  );
  q2.process(2, job => console.log('q2', job.data)).catch(err =>
    console.error(err),
  );
  q3.process(3, job => console.log('q3', job.data)).catch(err =>
    console.error(err),
  );
  console.log('processors ready');
  await Bluebird.delay(1000);
  await Promise.all([
    q1.add({ q: 1 }),
    q1.add({ q: 1.2 }),
    q2.add({ q: 2 }),
    q2.add({ q: 2.1 }),
    q2.add({ q: 2.2 }),
    q3.add({ q: 3 }),
    q3.add({ q: 3.1 }),
    q3.add({ q: 3.2 }),
    q3.add({ q: 3.3 }),
    q3.add({ q: 3.4 }),
  ]);
  await Bluebird.delay(1000);
  Object.values(clients).forEach(client => client.disconnect());
  console.log('disconnected redis');
  await q1.close();
  console.log('closed q1');
  await q2.close();
  console.log('closed q2');
  await q3.close();
  console.log('closed q3');
  console.log('disconnecting redis again');
  Object.values(clients).forEach(client => client.disconnect());
  console.log('end of main');
};

main();
$ NODE_OPTIONS=--max-old-space-size=100 node ./reproduce.js 
processors ready
q1 { q: 1 }
q2 { q: 2 }
q2 { q: 2.1 }
q3 { q: 3 }
q3 { q: 3.1 }
q3 { q: 3.2 }
q1 { q: 1.2 }
q2 { q: 2.2 }
q3 { q: 3.3 }
q3 { q: 3.4 }

<--- Last few GCs --->
e [3316:0x3825890]     5861 ms: Mark-sweep 99.3 (100.6) -> 98.9 (100.9) MB, 54.1 / 0.0 ms  (+ 5.7 ms in 8 steps since start of marking, biggest step 2.9 ms, walltime since start of marking 63 ms) (average mu = 0.122, current mu = 0.060) allocation failure s[3316:0x3825890]     5898 ms: Mark-sweep 99.5 (100.9) -> 99.1 (100.9) MB, 25.3 / 0.0 ms  (+ 8.2 ms in 10 steps since start of marking, biggest step 3.2 ms, walltime since start of marking 38 ms) (average mu = 0.117, current mu = 0.110) allocation failure 

<--- JS stacktrace --->

==== JS stack trace =========================================

    0: ExitFrame [pc: 0x1374fd9]
    1: StubFrame [pc: 0x1343cf8]
    2: StubFrame [pc: 0x1311dc7]
    3: EntryFrame [pc: 0x12f2b78]
    4: ExitFrame [pc: 0x12f622d]
Security context: 0x3b56550008a1 <JSObject>
    5: processTicksAndRejections [0xbab20a449d1] [internal/process/task_queues.js:~65] [pc=0x2ec6f29ec6b8](this=0x0bab20a42a09 <process map = 0x31243d9a8111>)
    6: InternalFrame [pc: 0x12f2c1d]
    7: EntryFrame [pc: 0x12f29f8]

=...

FATAL ERROR: Ineffective mark-compacts near heap limit Allocation failed - JavaScript heap out of memory

Writing Node.js report to file: report.20200522.172937.3316.0.001.json
Node.js report completed
 1: 0x9da7c0 node::Abort() [node]
 2: 0x9db976 node::OnFatalError(char const*, char const*) [node]
 3: 0xb39f1e v8::Utils::ReportOOMFailure(v8::internal::Isolate*, char const*, bool) [node]
 4: 0xb3a299 v8::internal::V8::FatalProcessOutOfMemory(v8::internal::Isolate*, char const*, bool) [node]
 5: 0xce5635  [node]
 6: 0xce5cc6 v8::internal::Heap::RecomputeLimits(v8::internal::GarbageCollector) [node]
 7: 0xcf1b5a v8::internal::Heap::PerformGarbageCollection(v8::internal::GarbageCollector, v8::GCCallbackFlags) [node]
 8: 0xcf2a65 v8::internal::Heap::CollectGarbage(v8::internal::AllocationSpace, v8::internal::GarbageCollectionReason, v8::GCCallbackFlags) [node]
 9: 0xcf5478 v8::internal::Heap::AllocateRawWithRetryOrFail(int, v8::internal::AllocationType, v8::internal::AllocationAlignment) [node]
10: 0xcbbda7 v8::internal::Factory::NewFillerObject(int, bool, v8::internal::AllocationType) [node]
11: 0xff1e0b v8::internal::Runtime_AllocateInYoungGeneration(int, unsigned long*, v8::internal::Isolate*) [node]
12: 0x1374fd9  [node]
Aborted

Bull version

3.14.0

Additional information

See also https://github.com/OptimalBits/bull/issues/1304

Assignee
Assign to
Time tracking