class AMQPError extends Error { constructor(message, connection) { super(message); this.name = "AMQPError"; this.connection = connection; } } class AMQPView extends DataView { getUint64(byteOffset, littleEndian) { const left = this.getUint32(byteOffset, littleEndian); const right = this.getUint32(byteOffset + 4, littleEndian); const combined = littleEndian ? left + 2 ** 32 * right : 2 ** 32 * left + right; if (!Number.isSafeInteger(combined)) console.warn(combined, 'exceeds MAX_SAFE_INTEGER. Precision may be lost'); return combined; } setUint64(byteOffset, value, littleEndian) { this.setBigUint64(byteOffset, BigInt(value), littleEndian); } getInt64(byteOffset, littleEndian) { return Number(this.getBigInt64(byteOffset, littleEndian)); } setInt64(byteOffset, value, littleEndian) { this.setBigInt64(byteOffset, BigInt(value), littleEndian); } getShortString(byteOffset) { const len = this.getUint8(byteOffset); byteOffset += 1; if (typeof Buffer !== "undefined") { const text = Buffer.from(this.buffer, this.byteOffset + byteOffset, len).toString(); return [text, len + 1]; } else { const view = new Uint8Array(this.buffer, this.byteOffset + byteOffset, len); const text = new TextDecoder().decode(view); return [text, len + 1]; } } setShortString(byteOffset, string) { if (typeof Buffer !== "undefined") { const len = Buffer.byteLength(string); if (len > 255) throw new Error(`Short string too long, ${len} bytes: ${string.substring(0, 255)}...`); this.setUint8(byteOffset, len); byteOffset += 1; Buffer.from(this.buffer, this.byteOffset + byteOffset, len).write(string); return len + 1; } else { const utf8 = new TextEncoder().encode(string); const len = utf8.byteLength; if (len > 255) throw new Error(`Short string too long, ${len} bytes: ${string.substring(0, 255)}...`); this.setUint8(byteOffset, len); byteOffset += 1; const view = new Uint8Array(this.buffer, this.byteOffset + byteOffset); view.set(utf8); return len + 1; } } getLongString(byteOffset, littleEndian) { const len = this.getUint32(byteOffset, littleEndian); byteOffset += 4; if (typeof Buffer !== "undefined") { const text = Buffer.from(this.buffer, this.byteOffset + byteOffset, len).toString(); return [text, len + 4]; } else { const view = new Uint8Array(this.buffer, this.byteOffset + byteOffset, len); const text = new TextDecoder().decode(view); return [text, len + 4]; } } setLongString(byteOffset, string, littleEndian) { if (typeof Buffer !== "undefined") { const len = Buffer.byteLength(string); this.setUint32(byteOffset, len, littleEndian); byteOffset += 4; Buffer.from(this.buffer, this.byteOffset + byteOffset, len).write(string); return len + 4; } else { const utf8 = new TextEncoder().encode(string); const len = utf8.byteLength; this.setUint32(byteOffset, len, littleEndian); byteOffset += 4; const view = new Uint8Array(this.buffer, this.byteOffset + byteOffset); view.set(utf8); return len + 4; } } getProperties(byteOffset, littleEndian) { let j = byteOffset; const flags = this.getUint16(j, littleEndian); j += 2; const props = {}; if ((flags & 0x8000) > 0) { const [contentType, len] = this.getShortString(j); j += len; props.contentType = contentType; } if ((flags & 0x4000) > 0) { const [contentEncoding, len] = this.getShortString(j); j += len; props.contentEncoding = contentEncoding; } if ((flags & 0x2000) > 0) { const [headers, len] = this.getTable(j, littleEndian); j += len; props.headers = headers; } if ((flags & 0x1000) > 0) { props.deliveryMode = this.getUint8(j); j += 1; } if ((flags & 0x0800) > 0) { props.priority = this.getUint8(j); j += 1; } if ((flags & 0x0400) > 0) { const [correlationId, len] = this.getShortString(j); j += len; props.correlationId = correlationId; } if ((flags & 0x0200) > 0) { const [replyTo, len] = this.getShortString(j); j += len; props.replyTo = replyTo; } if ((flags & 0x0100) > 0) { const [expiration, len] = this.getShortString(j); j += len; props.expiration = expiration; } if ((flags & 0x0080) > 0) { const [messageId, len] = this.getShortString(j); j += len; props.messageId = messageId; } if ((flags & 0x0040) > 0) { props.timestamp = new Date(this.getInt64(j, littleEndian) * 1000); j += 8; } if ((flags & 0x0020) > 0) { const [type, len] = this.getShortString(j); j += len; props.type = type; } if ((flags & 0x0010) > 0) { const [userId, len] = this.getShortString(j); j += len; props.userId = userId; } if ((flags & 0x0008) > 0) { const [appId, len] = this.getShortString(j); j += len; props.appId = appId; } const len = j - byteOffset; return [props, len]; } setProperties(byteOffset, properties, littleEndian) { let j = byteOffset; let flags = 0; if (properties.contentType) flags = flags | 0x8000; if (properties.contentEncoding) flags = flags | 0x4000; if (properties.headers) flags = flags | 0x2000; if (properties.deliveryMode) flags = flags | 0x1000; if (properties.priority) flags = flags | 0x0800; if (properties.correlationId) flags = flags | 0x0400; if (properties.replyTo) flags = flags | 0x0200; if (properties.expiration) flags = flags | 0x0100; if (properties.messageId) flags = flags | 0x0080; if (properties.timestamp) flags = flags | 0x0040; if (properties.type) flags = flags | 0x0020; if (properties.userId) flags = flags | 0x0010; if (properties.appId) flags = flags | 0x0008; this.setUint16(j, flags, littleEndian); j += 2; if (properties.contentType) { j += this.setShortString(j, properties.contentType); } if (properties.contentEncoding) { j += this.setShortString(j, properties.contentEncoding); } if (properties.headers) { j += this.setTable(j, properties.headers); } if (properties.deliveryMode) { this.setUint8(j, properties.deliveryMode); j += 1; } if (properties.priority) { this.setUint8(j, properties.priority); j += 1; } if (properties.correlationId) { j += this.setShortString(j, properties.correlationId); } if (properties.replyTo) { j += this.setShortString(j, properties.replyTo); } if (properties.expiration) { j += this.setShortString(j, properties.expiration); } if (properties.messageId) { j += this.setShortString(j, properties.messageId); } if (properties.timestamp) { const unixEpoch = Math.floor(Number(properties.timestamp) / 1000); this.setInt64(j, unixEpoch, littleEndian); j += 8; } if (properties.type) { j += this.setShortString(j, properties.type); } if (properties.userId) { j += this.setShortString(j, properties.userId); } if (properties.appId) { j += this.setShortString(j, properties.appId); } const len = j - byteOffset; return len; } getTable(byteOffset, littleEndian) { const table = {}; let i = byteOffset; const len = this.getUint32(byteOffset, littleEndian); i += 4; for (; i < byteOffset + 4 + len;) { const [k, strLen] = this.getShortString(i); i += strLen; const [v, vLen] = this.getField(i, littleEndian); i += vLen; table[k] = v; } return [table, len + 4]; } setTable(byteOffset, table, littleEndian) { let i = byteOffset + 4; for (const [key, value] of Object.entries(table)) { if (value === undefined) continue; i += this.setShortString(i, key); i += this.setField(i, value, littleEndian); } this.setUint32(byteOffset, i - byteOffset - 4, littleEndian); return i - byteOffset; } getField(byteOffset, littleEndian) { let i = byteOffset; const k = this.getUint8(i); i += 1; const type = String.fromCharCode(k); let v; let len; switch (type) { case 't': v = this.getUint8(i) === 1; i += 1; break; case 'b': v = this.getInt8(i); i += 1; break; case 'B': v = this.getUint8(i); i += 1; break; case 's': v = this.getInt16(i, littleEndian); i += 2; break; case 'u': v = this.getUint16(i, littleEndian); i += 2; break; case 'I': v = this.getInt32(i, littleEndian); i += 4; break; case 'i': v = this.getUint32(i, littleEndian); i += 4; break; case 'l': v = this.getInt64(i, littleEndian); i += 8; break; case 'f': v = this.getFloat32(i, littleEndian); i += 4; break; case 'd': v = this.getFloat64(i, littleEndian); i += 8; break; case 'S': [v, len] = this.getLongString(i, littleEndian); i += len; break; case 'F': [v, len] = this.getTable(i, littleEndian); i += len; break; case 'A': [v, len] = this.getArray(i, littleEndian); i += len; break; case 'x': [v, len] = this.getByteArray(i, littleEndian); i += len; break; case 'T': v = new Date(this.getInt64(i, littleEndian) * 1000); i += 8; break; case 'V': v = null; break; case 'D': { const scale = this.getUint8(i); i += 1; const value = this.getUint32(i, littleEndian); i += 4; v = value / 10 ** scale; break; } default: throw `Field type '${k}' not supported`; } return [v, i - byteOffset]; } setField(byteOffset, field, littleEndian) { let i = byteOffset; switch (typeof field) { case "string": this.setUint8(i, 'S'.charCodeAt(0)); i += 1; i += this.setLongString(i, field, littleEndian); break; case "boolean": this.setUint8(i, 't'.charCodeAt(0)); i += 1; this.setUint8(i, field ? 1 : 0); i += 1; break; case "bigint": this.setUint8(i, 'l'.charCodeAt(0)); i += 1; this.setBigInt64(i, field, littleEndian); i += 8; break; case "number": if (Number.isInteger(field)) { if (-(2 ** 32) < field && field < 2 ** 32) { this.setUint8(i, 'I'.charCodeAt(0)); i += 1; this.setInt32(i, field, littleEndian); i += 4; } else { this.setUint8(i, 'l'.charCodeAt(0)); i += 1; this.setInt64(i, field, littleEndian); i += 8; } } else { if (-(2 ** 32) < field && field < 2 ** 32) { this.setUint8(i, 'f'.charCodeAt(0)); i += 1; this.setFloat32(i, field, littleEndian); i += 4; } else { this.setUint8(i, 'd'.charCodeAt(0)); i += 1; this.setFloat64(i, field, littleEndian); i += 8; } } break; case "object": if (Array.isArray(field)) { this.setUint8(i, 'A'.charCodeAt(0)); i += 1; i += this.setArray(i, field, littleEndian); } else if (field instanceof Uint8Array) { this.setUint8(i, 'x'.charCodeAt(0)); i += 1; i += this.setByteArray(i, field); } else if (field instanceof ArrayBuffer) { this.setUint8(i, 'x'.charCodeAt(0)); i += 1; i += this.setByteArray(i, new Uint8Array(field)); } else if (field instanceof Date) { this.setUint8(i, 'T'.charCodeAt(0)); i += 1; const unixEpoch = Math.floor(Number(field) / 1000); this.setInt64(i, unixEpoch, littleEndian); i += 8; } else if (field === null || field === undefined) { this.setUint8(i, 'V'.charCodeAt(0)); i += 1; } else { this.setUint8(i, 'F'.charCodeAt(0)); i += 1; i += this.setTable(i, field, littleEndian); } break; default: throw `Unsupported field type '${field}'`; } return i - byteOffset; } getArray(byteOffset, littleEndian) { const len = this.getUint32(byteOffset, littleEndian); byteOffset += 4; const endOffset = byteOffset + len; const v = []; for (; byteOffset < endOffset;) { const [field, fieldLen] = this.getField(byteOffset, littleEndian); byteOffset += fieldLen; v.push(field); } return [v, len + 4]; } setArray(byteOffset, array, littleEndian) { const start = byteOffset; byteOffset += 4; array.forEach((e) => { byteOffset += this.setField(byteOffset, e, littleEndian); }); this.setUint32(start, byteOffset - start - 4, littleEndian); return byteOffset - start; } getByteArray(byteOffset, littleEndian) { const len = this.getUint32(byteOffset, littleEndian); byteOffset += 4; const v = new Uint8Array(this.buffer, this.byteOffset + byteOffset, len); return [v, len + 4]; } setByteArray(byteOffset, data, littleEndian) { this.setUint32(byteOffset, data.byteLength, littleEndian); byteOffset += 4; const view = new Uint8Array(this.buffer, this.byteOffset + byteOffset, data.byteLength); view.set(data); return data.byteLength + 4; } } class AMQPQueue { constructor(channel, name) { this.channel = channel; this.name = name; } bind(exchange, routingKey = "", args = {}) { return new Promise((resolve, reject) => { this.channel.queueBind(this.name, exchange, routingKey, args) .then(() => resolve(this)) .catch(reject); }); } unbind(exchange, routingKey = "", args = {}) { return new Promise((resolve, reject) => { this.channel.queueUnbind(this.name, exchange, routingKey, args) .then(() => resolve(this)) .catch(reject); }); } publish(body, properties = {}) { return new Promise((resolve, reject) => { this.channel.basicPublish("", this.name, body, properties) .then(() => resolve(this)) .catch(reject); }); } subscribe({ noAck = true, exclusive = false, tag = "", args = {} } = {}, callback) { return this.channel.basicConsume(this.name, { noAck, exclusive, tag, args }, callback); } unsubscribe(consumerTag) { return new Promise((resolve, reject) => { this.channel.basicCancel(consumerTag) .then(() => resolve(this)) .catch(reject); }); } delete() { return new Promise((resolve, reject) => { this.channel.queueDelete(this.name) .then(() => resolve(this)) .catch(reject); }); } get({ noAck = true } = {}) { return this.channel.basicGet(this.name, { noAck }); } purge() { return this.channel.queuePurge(this.name); } } class AMQPConsumer { constructor(channel, tag, onMessage) { this.closed = false; this.channel = channel; this.tag = tag; this.onMessage = onMessage; } wait(timeout) { if (this.closedError) return Promise.reject(this.closedError); if (this.closed) return Promise.resolve(); return new Promise((resolve, reject) => { this.resolveWait = resolve; this.rejectWait = reject; if (timeout) { const onTimeout = () => reject(new AMQPError("Timeout", this.channel.connection)); this.timeoutId = setTimeout(onTimeout, timeout); } }); } cancel() { return this.channel.basicCancel(this.tag); } setClosed(err) { this.closed = true; if (err) this.closedError = err; if (this.timeoutId) clearTimeout(this.timeoutId); if (err) { if (this.rejectWait) this.rejectWait(err); } else { if (this.resolveWait) this.resolveWait(); } } } class AMQPChannel { constructor(connection, id) { this.consumers = new Map(); this.promises = []; this.unconfirmedPublishes = []; this.closed = false; this.confirmId = 0; this.connection = connection; this.id = id; } queue(name = "", { passive = false, durable = name !== "", autoDelete = name === "", exclusive = name === "" } = {}, args = {}) { return new Promise((resolve, reject) => { this.queueDeclare(name, { passive, durable, autoDelete, exclusive }, args) .then(({ name }) => resolve(new AMQPQueue(this, name))) .catch(reject); }); } prefetch(prefetchCount) { return this.basicQos(prefetchCount); } onReturn(message) { console.error("Message returned from server", message); } close(reason = "", code = 200) { if (this.closed) return this.rejectClosed(); this.closed = true; let j = 0; const frame = new AMQPView(new ArrayBuffer(512)); frame.setUint8(j, 1); j += 1; frame.setUint16(j, this.id); j += 2; frame.setUint32(j, 0); j += 4; frame.setUint16(j, 20); j += 2; frame.setUint16(j, 40); j += 2; frame.setUint16(j, code); j += 2; j += frame.setShortString(j, reason); frame.setUint16(j, 0); j += 2; frame.setUint16(j, 0); j += 2; frame.setUint8(j, 206); j += 1; frame.setUint32(3, j - 8); return this.sendRpc(frame, j); } basicGet(queue, { noAck = true } = {}) { if (this.closed) return this.rejectClosed(); let j = 0; const frame = new AMQPView(new ArrayBuffer(512)); frame.setUint8(j, 1); j += 1; frame.setUint16(j, this.id); j += 2; frame.setUint32(j, 11); j += 4; frame.setUint16(j, 60); j += 2; frame.setUint16(j, 70); j += 2; frame.setUint16(j, 0); j += 2; j += frame.setShortString(j, queue); frame.setUint8(j, noAck ? 1 : 0); j += 1; frame.setUint8(j, 206); j += 1; frame.setUint32(3, j - 8); return this.sendRpc(frame, j); } basicConsume(queue, { tag = "", noAck = true, exclusive = false, args = {} } = {}, callback) { if (this.closed) return this.rejectClosed(); let j = 0; const frame = new AMQPView(new ArrayBuffer(4096)); frame.setUint8(j, 1); j += 1; frame.setUint16(j, this.id); j += 2; frame.setUint32(j, 0); j += 4; frame.setUint16(j, 60); j += 2; frame.setUint16(j, 20); j += 2; frame.setUint16(j, 0); j += 2; j += frame.setShortString(j, queue); j += frame.setShortString(j, tag); let bits = 0; if (noAck) bits = bits | (1 << 1); if (exclusive) bits = bits | (1 << 2); frame.setUint8(j, bits); j += 1; j += frame.setTable(j, args); frame.setUint8(j, 206); j += 1; frame.setUint32(3, j - 8); return new Promise((resolve, reject) => { this.sendRpc(frame, j).then((consumerTag) => { const consumer = new AMQPConsumer(this, consumerTag, callback); this.consumers.set(consumerTag, consumer); resolve(consumer); }).catch(reject); }); } basicCancel(tag) { if (this.closed) return this.rejectClosed(); let j = 0; const frame = new AMQPView(new ArrayBuffer(512)); frame.setUint8(j, 1); j += 1; frame.setUint16(j, this.id); j += 2; frame.setUint32(j, 0); j += 4; frame.setUint16(j, 60); j += 2; frame.setUint16(j, 30); j += 2; j += frame.setShortString(j, tag); frame.setUint8(j, 0); j += 1; frame.setUint8(j, 206); j += 1; frame.setUint32(3, j - 8); return new Promise((resolve, reject) => { this.sendRpc(frame, j).then((consumerTag) => { const consumer = this.consumers.get(consumerTag); if (consumer) { consumer.setClosed(); this.consumers.delete(consumerTag); } resolve(this); }).catch(reject); }); } basicAck(deliveryTag, multiple = false) { if (this.closed) return this.rejectClosed(); let j = 0; const frame = new AMQPView(new ArrayBuffer(21)); frame.setUint8(j, 1); j += 1; frame.setUint16(j, this.id); j += 2; frame.setUint32(j, 13); j += 4; frame.setUint16(j, 60); j += 2; frame.setUint16(j, 80); j += 2; frame.setUint64(j, deliveryTag); j += 8; frame.setUint8(j, multiple ? 1 : 0); j += 1; frame.setUint8(j, 206); j += 1; return this.connection.send(new Uint8Array(frame.buffer, 0, 21)); } basicNack(deliveryTag, requeue = false, multiple = false) { if (this.closed) return this.rejectClosed(); let j = 0; const frame = new AMQPView(new ArrayBuffer(21)); frame.setUint8(j, 1); j += 1; frame.setUint16(j, this.id); j += 2; frame.setUint32(j, 13); j += 4; frame.setUint16(j, 60); j += 2; frame.setUint16(j, 120); j += 2; frame.setUint64(j, deliveryTag); j += 8; let bits = 0; if (multiple) bits = bits | (1 << 0); if (requeue) bits = bits | (1 << 1); frame.setUint8(j, bits); j += 1; frame.setUint8(j, 206); j += 1; return this.connection.send(new Uint8Array(frame.buffer, 0, 21)); } basicReject(deliveryTag, requeue = false) { if (this.closed) return this.rejectClosed(); let j = 0; const frame = new AMQPView(new ArrayBuffer(21)); frame.setUint8(j, 1); j += 1; frame.setUint16(j, this.id); j += 2; frame.setUint32(j, 13); j += 4; frame.setUint16(j, 60); j += 2; frame.setUint16(j, 90); j += 2; frame.setUint64(j, deliveryTag); j += 8; frame.setUint8(j, requeue ? 1 : 0); j += 1; frame.setUint8(j, 206); j += 1; return this.connection.send(new Uint8Array(frame.buffer, 0, 21)); } basicRecover(requeue = false) { if (this.closed) return this.rejectClosed(); let j = 0; const frame = new AMQPView(new ArrayBuffer(13)); frame.setUint8(j, 1); j += 1; frame.setUint16(j, this.id); j += 2; frame.setUint32(j, 5); j += 4; frame.setUint16(j, 60); j += 2; frame.setUint16(j, 110); j += 2; frame.setUint8(j, requeue ? 1 : 0); j += 1; frame.setUint8(j, 206); j += 1; return this.sendRpc(frame, j); } async basicPublish(exchange, routingKey, data, properties = {}, mandatory = false, immediate = false) { if (this.closed) return this.rejectClosed(); if (this.connection.blocked) return Promise.reject(new AMQPError(`Connection blocked by server: ${this.connection.blocked}`, this.connection)); let body; if (typeof Buffer !== "undefined" && data instanceof Buffer) { body = data; } else if (data instanceof Uint8Array) { body = data; } else if (data instanceof ArrayBuffer) { body = new Uint8Array(data); } else if (data === null) { body = new Uint8Array(0); } else if (typeof data === "string") { body = this.connection.textEncoder.encode(data); } else { throw new TypeError(`Invalid type ${typeof data} for parameter data`); } let j = 0; const buffer = this.connection.bufferPool.pop() || new AMQPView(new ArrayBuffer(this.connection.frameMax)); buffer.setUint8(j, 1); j += 1; buffer.setUint16(j, this.id); j += 2; j += 4; buffer.setUint16(j, 60); j += 2; buffer.setUint16(j, 40); j += 2; buffer.setUint16(j, 0); j += 2; j += buffer.setShortString(j, exchange); j += buffer.setShortString(j, routingKey); let bits = 0; if (mandatory) bits = bits | (1 << 0); if (immediate) bits = bits | (1 << 1); buffer.setUint8(j, bits); j += 1; buffer.setUint8(j, 206); j += 1; buffer.setUint32(3, j - 8); const headerStart = j; buffer.setUint8(j, 2); j += 1; buffer.setUint16(j, this.id); j += 2; j += 4; buffer.setUint16(j, 60); j += 2; buffer.setUint16(j, 0); j += 2; buffer.setUint32(j, 0); j += 4; buffer.setUint32(j, body.byteLength); j += 4; j += buffer.setProperties(j, properties); buffer.setUint8(j, 206); j += 1; buffer.setUint32(headerStart + 3, j - headerStart - 8); if (body.byteLength === 0) { await this.connection.send(new Uint8Array(buffer.buffer, 0, j)); } else if (j >= buffer.byteLength - 8) { await this.connection.send(new Uint8Array(buffer.buffer, 0, j)); j = 0; } for (let bodyPos = 0; bodyPos < body.byteLength;) { const frameSize = Math.min(body.byteLength - bodyPos, buffer.byteLength - 8 - j); const dataSlice = body.subarray(bodyPos, bodyPos + frameSize); buffer.setUint8(j, 3); j += 1; buffer.setUint16(j, this.id); j += 2; buffer.setUint32(j, frameSize); j += 4; const bodyView = new Uint8Array(buffer.buffer, j, frameSize); bodyView.set(dataSlice); j += frameSize; buffer.setUint8(j, 206); j += 1; await this.connection.send(new Uint8Array(buffer.buffer, 0, j)); bodyPos += frameSize; j = 0; } this.connection.bufferPool.push(buffer); if (this.confirmId) { return new Promise((resolve, reject) => this.unconfirmedPublishes.push([this.confirmId++, resolve, reject])); } else { return Promise.resolve(0); } } basicQos(prefetchCount, prefetchSize = 0, global = false) { if (this.closed) return this.rejectClosed(); let j = 0; const frame = new AMQPView(new ArrayBuffer(19)); frame.setUint8(j, 1); j += 1; frame.setUint16(j, this.id); j += 2; frame.setUint32(j, 11); j += 4; frame.setUint16(j, 60); j += 2; frame.setUint16(j, 10); j += 2; frame.setUint32(j, prefetchSize); j += 4; frame.setUint16(j, prefetchCount); j += 2; frame.setUint8(j, global ? 1 : 0); j += 1; frame.setUint8(j, 206); j += 1; return this.sendRpc(frame, j); } basicFlow(active = true) { if (this.closed) return this.rejectClosed(); let j = 0; const frame = new AMQPView(new ArrayBuffer(13)); frame.setUint8(j, 1); j += 1; frame.setUint16(j, this.id); j += 2; frame.setUint32(j, 5); j += 4; frame.setUint16(j, 20); j += 2; frame.setUint16(j, 20); j += 2; frame.setUint8(j, active ? 1 : 0); j += 1; frame.setUint8(j, 206); j += 1; return this.sendRpc(frame, j); } confirmSelect() { if (this.closed) return this.rejectClosed(); let j = 0; const frame = new AMQPView(new ArrayBuffer(13)); frame.setUint8(j, 1); j += 1; frame.setUint16(j, this.id); j += 2; frame.setUint32(j, 5); j += 4; frame.setUint16(j, 85); j += 2; frame.setUint16(j, 10); j += 2; frame.setUint8(j, 0); j += 1; frame.setUint8(j, 206); j += 1; return this.sendRpc(frame, j); } queueDeclare(name = "", { passive = false, durable = name !== "", autoDelete = name === "", exclusive = name === "" } = {}, args = {}) { if (this.closed) return this.rejectClosed(); let j = 0; const declare = new AMQPView(new ArrayBuffer(4096)); declare.setUint8(j, 1); j += 1; declare.setUint16(j, this.id); j += 2; declare.setUint32(j, 0); j += 4; declare.setUint16(j, 50); j += 2; declare.setUint16(j, 10); j += 2; declare.setUint16(j, 0); j += 2; j += declare.setShortString(j, name); let bits = 0; if (passive) bits = bits | (1 << 0); if (durable) bits = bits | (1 << 1); if (exclusive) bits = bits | (1 << 2); if (autoDelete) bits = bits | (1 << 3); declare.setUint8(j, bits); j += 1; j += declare.setTable(j, args); declare.setUint8(j, 206); j += 1; declare.setUint32(3, j - 8); return this.sendRpc(declare, j); } queueDelete(name = "", { ifUnused = false, ifEmpty = false } = {}) { if (this.closed) return this.rejectClosed(); let j = 0; const frame = new AMQPView(new ArrayBuffer(512)); frame.setUint8(j, 1); j += 1; frame.setUint16(j, this.id); j += 2; frame.setUint32(j, 0); j += 4; frame.setUint16(j, 50); j += 2; frame.setUint16(j, 40); j += 2; frame.setUint16(j, 0); j += 2; j += frame.setShortString(j, name); let bits = 0; if (ifUnused) bits = bits | (1 << 0); if (ifEmpty) bits = bits | (1 << 1); frame.setUint8(j, bits); j += 1; frame.setUint8(j, 206); j += 1; frame.setUint32(3, j - 8); return this.sendRpc(frame, j); } queueBind(queue, exchange, routingKey, args = {}) { if (this.closed) return this.rejectClosed(); let j = 0; const bind = new AMQPView(new ArrayBuffer(4096)); bind.setUint8(j, 1); j += 1; bind.setUint16(j, this.id); j += 2; bind.setUint32(j, 0); j += 4; bind.setUint16(j, 50); j += 2; bind.setUint16(j, 20); j += 2; bind.setUint16(j, 0); j += 2; j += bind.setShortString(j, queue); j += bind.setShortString(j, exchange); j += bind.setShortString(j, routingKey); bind.setUint8(j, 0); j += 1; j += bind.setTable(j, args); bind.setUint8(j, 206); j += 1; bind.setUint32(3, j - 8); return this.sendRpc(bind, j); } queueUnbind(queue, exchange, routingKey, args = {}) { if (this.closed) return this.rejectClosed(); let j = 0; const unbind = new AMQPView(new ArrayBuffer(4096)); unbind.setUint8(j, 1); j += 1; unbind.setUint16(j, this.id); j += 2; unbind.setUint32(j, 0); j += 4; unbind.setUint16(j, 50); j += 2; unbind.setUint16(j, 50); j += 2; unbind.setUint16(j, 0); j += 2; j += unbind.setShortString(j, queue); j += unbind.setShortString(j, exchange); j += unbind.setShortString(j, routingKey); j += unbind.setTable(j, args); unbind.setUint8(j, 206); j += 1; unbind.setUint32(3, j - 8); return this.sendRpc(unbind, j); } queuePurge(queue) { if (this.closed) return this.rejectClosed(); let j = 0; const purge = new AMQPView(new ArrayBuffer(512)); purge.setUint8(j, 1); j += 1; purge.setUint16(j, this.id); j += 2; purge.setUint32(j, 0); j += 4; purge.setUint16(j, 50); j += 2; purge.setUint16(j, 30); j += 2; purge.setUint16(j, 0); j += 2; j += purge.setShortString(j, queue); purge.setUint8(j, 0); j += 1; purge.setUint8(j, 206); j += 1; purge.setUint32(3, j - 8); return this.sendRpc(purge, j); } exchangeDeclare(name, type, { passive = false, durable = true, autoDelete = false, internal = false } = {}, args = {}) { let j = 0; const frame = new AMQPView(new ArrayBuffer(4096)); frame.setUint8(j, 1); j += 1; frame.setUint16(j, this.id); j += 2; frame.setUint32(j, 0); j += 4; frame.setUint16(j, 40); j += 2; frame.setUint16(j, 10); j += 2; frame.setUint16(j, 0); j += 2; j += frame.setShortString(j, name); j += frame.setShortString(j, type); let bits = 0; if (passive) bits = bits | (1 << 0); if (durable) bits = bits | (1 << 1); if (autoDelete) bits = bits | (1 << 2); if (internal) bits = bits | (1 << 3); frame.setUint8(j, bits); j += 1; j += frame.setTable(j, args); frame.setUint8(j, 206); j += 1; frame.setUint32(3, j - 8); return this.sendRpc(frame, j); } exchangeDelete(name, { ifUnused = false } = {}) { let j = 0; const frame = new AMQPView(new ArrayBuffer(512)); frame.setUint8(j, 1); j += 1; frame.setUint16(j, this.id); j += 2; frame.setUint32(j, 0); j += 4; frame.setUint16(j, 40); j += 2; frame.setUint16(j, 20); j += 2; frame.setUint16(j, 0); j += 2; j += frame.setShortString(j, name); let bits = 0; if (ifUnused) bits = bits | (1 << 0); frame.setUint8(j, bits); j += 1; frame.setUint8(j, 206); j += 1; frame.setUint32(3, j - 8); return this.sendRpc(frame, j); } exchangeBind(destination, source, routingKey = "", args = {}) { if (this.closed) return this.rejectClosed(); let j = 0; const bind = new AMQPView(new ArrayBuffer(4096)); bind.setUint8(j, 1); j += 1; bind.setUint16(j, this.id); j += 2; bind.setUint32(j, 0); j += 4; bind.setUint16(j, 40); j += 2; bind.setUint16(j, 30); j += 2; bind.setUint16(j, 0); j += 2; j += bind.setShortString(j, destination); j += bind.setShortString(j, source); j += bind.setShortString(j, routingKey); bind.setUint8(j, 0); j += 1; j += bind.setTable(j, args); bind.setUint8(j, 206); j += 1; bind.setUint32(3, j - 8); return this.sendRpc(bind, j); } exchangeUnbind(destination, source, routingKey = "", args = {}) { if (this.closed) return this.rejectClosed(); let j = 0; const unbind = new AMQPView(new ArrayBuffer(4096)); unbind.setUint8(j, 1); j += 1; unbind.setUint16(j, this.id); j += 2; unbind.setUint32(j, 0); j += 4; unbind.setUint16(j, 40); j += 2; unbind.setUint16(j, 40); j += 2; unbind.setUint16(j, 0); j += 2; j += unbind.setShortString(j, destination); j += unbind.setShortString(j, source); j += unbind.setShortString(j, routingKey); unbind.setUint8(j, 0); j += 1; j += unbind.setTable(j, args); unbind.setUint8(j, 206); j += 1; unbind.setUint32(3, j - 8); return this.sendRpc(unbind, j); } txSelect() { return this.txMethod(10); } txCommit() { return this.txMethod(20); } txRollback() { return this.txMethod(30); } txMethod(methodId) { if (this.closed) return this.rejectClosed(); let j = 0; const frame = new AMQPView(new ArrayBuffer(12)); frame.setUint8(j, 1); j += 1; frame.setUint16(j, this.id); j += 2; frame.setUint32(j, 4); j += 4; frame.setUint16(j, 90); j += 2; frame.setUint16(j, methodId); j += 2; frame.setUint8(j, 206); j += 1; return this.sendRpc(frame, j); } resolvePromise(value) { const promise = this.promises.shift(); if (promise) { const [resolve,] = promise; resolve(value); return true; } return false; } rejectPromise(err) { const promise = this.promises.shift(); if (promise) { const [, reject] = promise; reject(err); return true; } return false; } sendRpc(frame, frameSize) { return new Promise((resolve, reject) => { this.connection.send(new Uint8Array(frame.buffer, 0, frameSize)) .then(() => this.promises.push([resolve, reject])) .catch(reject); }); } setClosed(err) { err || (err = new Error("Connection closed by client")); if (!this.closed) { this.closed = true; this.consumers.forEach((consumer) => consumer.setClosed(err)); this.consumers.clear(); while (this.rejectPromise(err)) { } this.unconfirmedPublishes.forEach(([, , reject]) => reject(err)); this.unconfirmedPublishes.length = 0; } } rejectClosed() { return Promise.reject(new AMQPError("Channel is closed", this.connection)); } publishConfirmed(deliveryTag, multiple, nack) { const idx = this.unconfirmedPublishes.findIndex(([tag,]) => tag === deliveryTag); if (idx !== -1) { const confirmed = multiple ? this.unconfirmedPublishes.splice(0, idx + 1) : this.unconfirmedPublishes.splice(idx, 1); confirmed.forEach(([tag, resolve, reject]) => { if (nack) reject(new Error("Message rejected")); else resolve(tag); }); } else { console.warn("Cant find unconfirmed deliveryTag", deliveryTag, "multiple:", multiple, "nack:", nack); } } onMessageReady(message) { if (this.delivery) { delete this.delivery; this.deliver(message); } else if (this.getMessage) { delete this.getMessage; this.resolvePromise(message); } else { delete this.returned; this.onReturn(message); } } deliver(message) { queueMicrotask(() => { const consumer = this.consumers.get(message.consumerTag); if (consumer) { consumer.onMessage(message); } else { console.warn("Consumer", message.consumerTag, "not available on channel", this.id); } }); } } class AMQPMessage { constructor(channel) { this.exchange = ""; this.routingKey = ""; this.properties = {}; this.bodySize = 0; this.body = null; this.bodyPos = 0; this.deliveryTag = 0; this.consumerTag = ""; this.redelivered = false; this.channel = channel; } bodyToString() { if (this.body) { if (typeof Buffer !== "undefined") return Buffer.from(this.body).toString(); else return new TextDecoder().decode(this.body); } else { return null; } } bodyString() { return this.bodyToString(); } ack(multiple = false) { return this.channel.basicAck(this.deliveryTag, multiple); } nack(requeue = false, multiple = false) { return this.channel.basicNack(this.deliveryTag, requeue, multiple); } reject(requeue = false) { return this.channel.basicReject(this.deliveryTag, requeue); } cancelConsumer() { return this.channel.basicCancel(this.consumerTag); } } const VERSION = '2.1.0'; class AMQPBaseClient { constructor(vhost, username, password, name, platform, frameMax = 4096, heartbeat = 0) { this.closed = true; this.channelMax = 0; this.textEncoder = new TextEncoder(); this.bufferPool = []; this.vhost = vhost; this.username = username; this.password = ""; Object.defineProperty(this, 'password', { value: password, enumerable: false }); if (name) this.name = name; if (platform) this.platform = platform; this.channels = [new AMQPChannel(this, 0)]; this.onerror = (error) => console.error("amqp-client connection closed", error.message); if (frameMax < 4096) throw new Error("frameMax must be 4096 or larger"); this.frameMax = frameMax; if (heartbeat < 0) throw new Error("heartbeat must be positive"); this.heartbeat = heartbeat; } channel(id) { if (this.closed) return this.rejectClosed(); if (id && id > 0) { const channel = this.channels[id]; if (channel) return Promise.resolve(channel); } if (!id) id = this.channels.findIndex((ch) => ch === undefined); if (id === -1) id = this.channels.length; const channel = new AMQPChannel(this, id); this.channels[id] = channel; let j = 0; const channelOpen = new AMQPView(new ArrayBuffer(13)); channelOpen.setUint8(j, 1); j += 1; channelOpen.setUint16(j, id); j += 2; channelOpen.setUint32(j, 5); j += 4; channelOpen.setUint16(j, 20); j += 2; channelOpen.setUint16(j, 10); j += 2; channelOpen.setUint8(j, 0); j += 1; channelOpen.setUint8(j, 206); j += 1; return new Promise((resolve, reject) => { this.send(new Uint8Array(channelOpen.buffer, 0, 13)) .then(() => channel.promises.push([resolve, reject])) .catch(reject); }); } close(reason = "", code = 200) { if (this.closed) return this.rejectClosed(); this.closed = true; let j = 0; const frame = new AMQPView(new ArrayBuffer(512)); frame.setUint8(j, 1); j += 1; frame.setUint16(j, 0); j += 2; frame.setUint32(j, 0); j += 4; frame.setUint16(j, 10); j += 2; frame.setUint16(j, 50); j += 2; frame.setUint16(j, code); j += 2; j += frame.setShortString(j, reason); frame.setUint16(j, 0); j += 2; frame.setUint16(j, 0); j += 2; frame.setUint8(j, 206); j += 1; frame.setUint32(3, j - 8); return new Promise((resolve, reject) => { this.send(new Uint8Array(frame.buffer, 0, j)) .then(() => this.closePromise = [resolve, reject]) .catch(reject); }); } rejectClosed() { return Promise.reject(new AMQPError("Connection closed", this)); } rejectConnect(err) { if (this.connectPromise) { const [, reject] = this.connectPromise; delete this.connectPromise; reject(err); } this.closed = true; this.closeSocket(); } parseFrames(view) { for (let i = 0; i < view.byteLength;) { let j = 0; const type = view.getUint8(i); i += 1; const channelId = view.getUint16(i); i += 2; const frameSize = view.getUint32(i); i += 4; try { const frameEnd = view.getUint8(i + frameSize); if (frameEnd !== 206) throw (new AMQPError(`Invalid frame end ${frameEnd}, expected 206`, this)); } catch (e) { throw (new AMQPError(`Frame end out of range, frameSize=${frameSize}, pos=${i}, byteLength=${view.byteLength}`, this)); } const channel = this.channels[channelId]; if (!channel) { console.warn("AMQP channel", channelId, "not open"); i += frameSize + 1; continue; } switch (type) { case 1: { const classId = view.getUint16(i); i += 2; const methodId = view.getUint16(i); i += 2; switch (classId) { case 10: { switch (methodId) { case 10: { i += frameSize - 4; const startOk = new AMQPView(new ArrayBuffer(4096)); startOk.setUint8(j, 1); j += 1; startOk.setUint16(j, 0); j += 2; startOk.setUint32(j, 0); j += 4; startOk.setUint16(j, 10); j += 2; startOk.setUint16(j, 11); j += 2; const clientProps = { connection_name: this.name || undefined, product: "amqp-client.js", information: "https://github.com/cloudamqp/amqp-client.js", version: VERSION, platform: this.platform, capabilities: { "authentication_failure_close": true, "basic.nack": true, "connection.blocked": true, "consumer_cancel_notify": true, "exchange_exchange_bindings": true, "per_consumer_qos": true, "publisher_confirms": true, } }; j += startOk.setTable(j, clientProps); j += startOk.setShortString(j, "PLAIN"); const response = `\u0000${this.username}\u0000${this.password}`; j += startOk.setLongString(j, response); j += startOk.setShortString(j, ""); startOk.setUint8(j, 206); j += 1; startOk.setUint32(3, j - 8); this.send(new Uint8Array(startOk.buffer, 0, j)).catch(this.rejectConnect); break; } case 30: { const channelMax = view.getUint16(i); i += 2; const frameMax = view.getUint32(i); i += 4; const heartbeat = view.getUint16(i); i += 2; this.channelMax = channelMax; this.frameMax = this.frameMax === 0 ? frameMax : Math.min(this.frameMax, frameMax); this.heartbeat = this.heartbeat === 0 ? 0 : Math.min(this.heartbeat, heartbeat); const tuneOk = new AMQPView(new ArrayBuffer(20)); tuneOk.setUint8(j, 1); j += 1; tuneOk.setUint16(j, 0); j += 2; tuneOk.setUint32(j, 12); j += 4; tuneOk.setUint16(j, 10); j += 2; tuneOk.setUint16(j, 31); j += 2; tuneOk.setUint16(j, this.channelMax); j += 2; tuneOk.setUint32(j, this.frameMax); j += 4; tuneOk.setUint16(j, this.heartbeat); j += 2; tuneOk.setUint8(j, 206); j += 1; this.send(new Uint8Array(tuneOk.buffer, 0, j)).catch(this.rejectConnect); j = 0; const open = new AMQPView(new ArrayBuffer(512)); open.setUint8(j, 1); j += 1; open.setUint16(j, 0); j += 2; open.setUint32(j, 0); j += 4; open.setUint16(j, 10); j += 2; open.setUint16(j, 40); j += 2; j += open.setShortString(j, this.vhost); open.setUint8(j, 0); j += 1; open.setUint8(j, 0); j += 1; open.setUint8(j, 206); j += 1; open.setUint32(3, j - 8); this.send(new Uint8Array(open.buffer, 0, j)).catch(this.rejectConnect); break; } case 41: { i += 1; this.closed = false; const promise = this.connectPromise; if (promise) { const [resolve,] = promise; delete this.connectPromise; resolve(this); } break; } case 50: { const code = view.getUint16(i); i += 2; const [text, strLen] = view.getShortString(i); i += strLen; const classId = view.getUint16(i); i += 2; const methodId = view.getUint16(i); i += 2; console.debug("connection closed by server", code, text, classId, methodId); const msg = `connection closed: ${text} (${code})`; const err = new AMQPError(msg, this); this.channels.forEach((ch) => ch.setClosed(err)); this.channels = [new AMQPChannel(this, 0)]; const closeOk = new AMQPView(new ArrayBuffer(12)); closeOk.setUint8(j, 1); j += 1; closeOk.setUint16(j, 0); j += 2; closeOk.setUint32(j, 4); j += 4; closeOk.setUint16(j, 10); j += 2; closeOk.setUint16(j, 51); j += 2; closeOk.setUint8(j, 206); j += 1; this.send(new Uint8Array(closeOk.buffer, 0, j)) .catch(err => console.warn("Error while sending Connection#CloseOk", err)); this.onerror(err); this.rejectConnect(err); break; } case 51: { this.channels.forEach((ch) => ch.setClosed()); this.channels = [new AMQPChannel(this, 0)]; const promise = this.closePromise; if (promise) { const [resolve,] = promise; delete this.closePromise; resolve(); this.closeSocket(); } break; } case 60: { const [reason, len] = view.getShortString(i); i += len; console.warn("AMQP connection blocked:", reason); this.blocked = reason; break; } case 61: { console.info("AMQP connection unblocked"); delete this.blocked; break; } default: i += frameSize - 4; console.error("unsupported class/method id", classId, methodId); } break; } case 20: { switch (methodId) { case 11: { i += 4; channel.resolvePromise(channel); break; } case 21: { const active = view.getUint8(i) !== 0; i += 1; channel.resolvePromise(active); break; } case 40: { const code = view.getUint16(i); i += 2; const [text, strLen] = view.getShortString(i); i += strLen; const classId = view.getUint16(i); i += 2; const methodId = view.getUint16(i); i += 2; console.debug("channel", channelId, "closed", code, text, classId, methodId); const msg = `channel ${channelId} closed: ${text} (${code})`; const err = new AMQPError(msg, this); channel.setClosed(err); delete this.channels[channelId]; const closeOk = new AMQPView(new ArrayBuffer(12)); closeOk.setUint8(j, 1); j += 1; closeOk.setUint16(j, channelId); j += 2; closeOk.setUint32(j, 4); j += 4; closeOk.setUint16(j, 20); j += 2; closeOk.setUint16(j, 41); j += 2; closeOk.setUint8(j, 206); j += 1; this.send(new Uint8Array(closeOk.buffer, 0, j)) .catch(err => console.error("Error while sending Channel#closeOk", err)); break; } case 41: { channel.setClosed(); delete this.channels[channelId]; channel.resolvePromise(); break; } default: i += frameSize - 4; console.error("unsupported class/method id", classId, methodId); } break; } case 40: { switch (methodId) { case 11: case 21: case 31: case 51: { channel.resolvePromise(); break; } default: i += frameSize - 4; console.error("unsupported class/method id", classId, methodId); } break; } case 50: { switch (methodId) { case 11: { const [name, strLen] = view.getShortString(i); i += strLen; const messageCount = view.getUint32(i); i += 4; const consumerCount = view.getUint32(i); i += 4; channel.resolvePromise({ name, messageCount, consumerCount }); break; } case 21: { channel.resolvePromise(); break; } case 31: { const messageCount = view.getUint32(i); i += 4; channel.resolvePromise({ messageCount }); break; } case 41: { const messageCount = view.getUint32(i); i += 4; channel.resolvePromise({ messageCount }); break; } case 51: { channel.resolvePromise(); break; } default: i += frameSize - 4; console.error("unsupported class/method id", classId, methodId); } break; } case 60: { switch (methodId) { case 11: { channel.resolvePromise(); break; } case 21: { const [consumerTag, len] = view.getShortString(i); i += len; channel.resolvePromise(consumerTag); break; } case 30: { const [consumerTag, len] = view.getShortString(i); i += len; const noWait = view.getUint8(i) === 1; i += 1; const consumer = channel.consumers.get(consumerTag); if (consumer) { consumer.setClosed(new AMQPError("Consumer cancelled by the server", this)); channel.consumers.delete(consumerTag); } if (!noWait) { const frame = new AMQPView(new ArrayBuffer(512)); frame.setUint8(j, 1); j += 1; frame.setUint16(j, channel.id); j += 2; frame.setUint32(j, 0); j += 4; frame.setUint16(j, 60); j += 2; frame.setUint16(j, 31); j += 2; j += frame.setShortString(j, consumerTag); frame.setUint8(j, 206); j += 1; frame.setUint32(3, j - 8); this.send(new Uint8Array(frame.buffer, 0, j)); } break; } case 31: { const [consumerTag, len] = view.getShortString(i); i += len; channel.resolvePromise(consumerTag); break; } case 50: { const code = view.getUint16(i); i += 2; const [text, len] = view.getShortString(i); i += len; const [exchange, exchangeLen] = view.getShortString(i); i += exchangeLen; const [routingKey, routingKeyLen] = view.getShortString(i); i += routingKeyLen; const message = new AMQPMessage(channel); message.exchange = exchange; message.routingKey = routingKey; message.replyCode = code; message.replyText = text; channel.returned = message; break; } case 60: { const [consumerTag, consumerTagLen] = view.getShortString(i); i += consumerTagLen; const deliveryTag = view.getUint64(i); i += 8; const redelivered = view.getUint8(i) === 1; i += 1; const [exchange, exchangeLen] = view.getShortString(i); i += exchangeLen; const [routingKey, routingKeyLen] = view.getShortString(i); i += routingKeyLen; const message = new AMQPMessage(channel); message.consumerTag = consumerTag; message.deliveryTag = deliveryTag; message.exchange = exchange; message.routingKey = routingKey; message.redelivered = redelivered; channel.delivery = message; break; } case 71: { const deliveryTag = view.getUint64(i); i += 8; const redelivered = view.getUint8(i) === 1; i += 1; const [exchange, exchangeLen] = view.getShortString(i); i += exchangeLen; const [routingKey, routingKeyLen] = view.getShortString(i); i += routingKeyLen; const messageCount = view.getUint32(i); i += 4; const message = new AMQPMessage(channel); message.deliveryTag = deliveryTag; message.redelivered = redelivered; message.exchange = exchange; message.routingKey = routingKey; message.messageCount = messageCount; channel.getMessage = message; break; } case 72: { const [, len] = view.getShortString(i); i += len; channel.resolvePromise(null); break; } case 80: { const deliveryTag = view.getUint64(i); i += 8; const multiple = view.getUint8(i) === 1; i += 1; channel.publishConfirmed(deliveryTag, multiple, false); break; } case 111: { channel.resolvePromise(); break; } case 120: { const deliveryTag = view.getUint64(i); i += 8; const multiple = view.getUint8(i) === 1; i += 1; channel.publishConfirmed(deliveryTag, multiple, true); break; } default: i += frameSize - 4; console.error("unsupported class/method id", classId, methodId); } break; } case 85: { switch (methodId) { case 11: { channel.confirmId = 1; channel.resolvePromise(); break; } default: i += frameSize - 4; console.error("unsupported class/method id", classId, methodId); } break; } case 90: { switch (methodId) { case 11: case 21: case 31: { channel.resolvePromise(); break; } default: i += frameSize - 4; console.error("unsupported class/method id", classId, methodId); } break; } default: i += frameSize - 2; console.error("unsupported class id", classId); } break; } case 2: { i += 4; const bodySize = view.getUint64(i); i += 8; const [properties, propLen] = view.getProperties(i); i += propLen; const message = channel.delivery || channel.getMessage || channel.returned; if (message) { message.bodySize = bodySize; message.properties = properties; message.body = new Uint8Array(bodySize); if (bodySize === 0) channel.onMessageReady(message); } else { console.warn("Header frame but no message"); } break; } case 3: { const message = channel.delivery || channel.getMessage || channel.returned; if (message && message.body) { const bodyPart = new Uint8Array(view.buffer, view.byteOffset + i, frameSize); message.body.set(bodyPart, message.bodyPos); message.bodyPos += frameSize; i += frameSize; if (message.bodyPos === message.bodySize) channel.onMessageReady(message); } else { console.warn("Body frame but no message"); } break; } case 8: { const heartbeat = new Uint8Array([8, 0, 0, 0, 0, 0, 0, 206]); this.send(heartbeat).catch(err => console.warn("Error while sending heartbeat", err)); break; } default: console.error("invalid frame type:", type); i += frameSize; } i += 1; } } } class AMQPWebSocketClient extends AMQPBaseClient { constructor(url, vhost = "/", username = "guest", password = "guest", name, frameMax = 4096, heartbeat = 0) { super(vhost, username, password, name, AMQPWebSocketClient.platform(), frameMax, heartbeat); this.framePos = 0; this.frameSize = 0; this.url = url; this.frameBuffer = new Uint8Array(frameMax); } connect() { const socket = new WebSocket(this.url); this.socket = socket; socket.binaryType = "arraybuffer"; socket.onmessage = this.handleMessage.bind(this); return new Promise((resolve, reject) => { this.connectPromise = [resolve, reject]; socket.onclose = reject; socket.onerror = reject; socket.onopen = () => { socket.onerror = (ev) => this.onerror(new AMQPError(ev.toString(), this)); socket.send(new Uint8Array([65, 77, 81, 80, 0, 0, 9, 1])); }; }); } send(bytes) { return new Promise((resolve, reject) => { if (this.socket) { try { this.socket.send(bytes); resolve(); } catch (err) { this.closeSocket(); reject(err); } } else { reject("Socket not connected"); } }); } closeSocket() { this.closed = true; if (this.socket) this.socket.close(); this.socket = undefined; } handleMessage(event) { const buf = event.data; const bufView = new DataView(buf); let bufPos = 0; while (bufPos < buf.byteLength) { if (this.frameSize === 0) { if (this.framePos !== 0) { const len = buf.byteLength - bufPos; this.frameBuffer.set(new Uint8Array(buf, bufPos), this.framePos); this.frameSize = new DataView(this.frameBuffer).getInt32(bufPos + 3) + 8; this.framePos += len; bufPos += len; continue; } if (bufPos + 3 + 4 > buf.byteLength) { const len = buf.byteLength - bufPos; this.frameBuffer.set(new Uint8Array(buf, bufPos), this.framePos); this.framePos += len; break; } this.frameSize = bufView.getInt32(bufPos + 3) + 8; if (buf.byteLength - bufPos >= this.frameSize) { const view = new AMQPView(buf, bufPos, this.frameSize); this.parseFrames(view); bufPos += this.frameSize; this.frameSize = 0; continue; } } const leftOfFrame = this.frameSize - this.framePos; const copyBytes = Math.min(leftOfFrame, buf.byteLength - bufPos); this.frameBuffer.set(new Uint8Array(buf, bufPos, copyBytes), this.framePos); this.framePos += copyBytes; bufPos += copyBytes; if (this.framePos === this.frameSize) { const view = new AMQPView(this.frameBuffer.buffer, 0, this.frameSize); this.parseFrames(view); this.frameSize = this.framePos = 0; } } } static platform() { if (typeof (window) !== 'undefined') return window.navigator.userAgent; else return `${process.release.name} ${process.version} ${process.platform} ${process.arch}`; } } export { AMQPWebSocketClient }; //# sourceMappingURL=amqp-websocket-client.js.map