-
-
Notifications
You must be signed in to change notification settings - Fork 86
upgrade to v3.0.0 and add compatibility with new WhatsMeow provider #116
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
WalkthroughAdds WhatsMeow provider support across UI and backend: new session command controller/endpoints, adapter-based connect/qr/disconnect flows, AMQP routing for provider.* with outgoing queues, WhatsMeow bridging consumer, media handling in outgoing, config/defaults updates, and provider defaults. Includes frontend modal/provider selection and dynamic session actions. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor User as UI User
participant UI as Frontend (public/index.html)
participant API as SessionCommandController
participant Adapter as WhatsMeow Adapter
participant Local as Local Services (Reload/Logout)
rect rgb(240,248,255)
note over UI: Start WhatsMeow connect flow
User->>UI: Click "Conectar" (provider=whatsmeow)
UI->>API: POST /sessions/:phone/connect
API-->>Adapter: POST /connect
Adapter-->>API: 204/200
API-->>UI: 204
UI->>API: GET /sessions/:phone/qr (poll)
API-->>Adapter: GET /qr
Adapter-->>API: QR (binary)
API-->>UI: base64 QR
UI->>API: GET /sessions/:phone/status (poll)
API-->>UI: preparing/reading_qr/connected
end
rect rgb(255,248,240)
note over UI,API: Non-WhatsMeow connect
User->>UI: Click "Conectar" (provider=baileys)
UI->>API: POST /sessions/:phone/connect
API-->>Local: Reload.execute
Local-->>API: ok
API-->>UI: 204
end
sequenceDiagram
autonumber
participant Ext as External Source
participant Bridge as IncomingWhatsmeow
participant AMQP as Exchange unoapi.outgoing
participant OutQ as outgoing.whatsmeow/outgoing.baileys
participant Worker as IncomingJob/IncomingBaileys
participant Webhook as Webhook Endpoint
rect rgb(245,255,250)
Ext->>Bridge: Provider event (payload)
Bridge->>AMQP: publish routingKey=provider.whatsmeow.{phone}
AMQP-->>OutQ: route to outgoing.whatsmeow
OutQ-->>Worker: consume message
Worker->>Webhook: POST normalized payload
end
sequenceDiagram
autonumber
participant WA as WhatsApp Media
participant Out as jobs/outgoing.ts
participant FS as File Store
participant WT as Webhook
note over Out: WhatsMeow media handling path
WA-->>Out: message with mediaKey/directPath/url
Out->>Out: downloadContentFromMessage + decrypt
Out->>FS: save buffer -> file
Out->>Out: attach media id/mime_type
Out-->>WT: POST after short delay
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Poem
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 15
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (4)
src/services/config_redis.ts (1)
32-38: Preserve falsy webhookForward overrides
if (!webhookForward[k])treats legitimate falsy redis overrides (e.g.,false,0, empty string) as “missing” and silently restores the env default, so toggles that should stay disabled are re-enabled on load. We should only back-fill keys that are actually undefined/null, and we should iterate the default keys to cover values absent from Redis entirely.- } else if (key === 'webhookForward') { - const webhookForward = configRedis[key] - Object.keys(configRedis[key]).forEach((k) => { - if (!webhookForward[k]) { - webhookForward[k] = config[key][k] - } - }) - configRedis[key] = webhookForward + } else if (key === 'webhookForward') { + const defaults = config[key] || {} + const webhookForward = configRedis[key] || {} + Object.keys(defaults).forEach((k) => { + if (webhookForward[k] === undefined) { + webhookForward[k] = defaults[k] + } + }) + configRedis[key] = webhookForward }src/services/client_baileys.ts (1)
603-721: WirePresignedLinkValidatorinto the send flow.We define a comprehensive
PresignedLinkValidator, but the media send path still does a plainHEADrequest (and the catch block repeats it). As a result, presigned URLs continue to fail—none of the new retry/GET-range logic ever runs. Please replace these directfetch(..., { method: 'HEAD' })probes withawait PresignedLinkValidator.validateLink(link)(and handle its boolean/exception result), both in the primary validation block and in the upload-failure retry path.This will ensure the WhatsMeow presigned links are actually supported instead of still tripping the old validation.
src/jobs/bind_bridge.ts (1)
37-40: Add WhatsMeow to the allowed provider list.Line 37 still filters
config.providerto only'forwarder'and'baileys'. With the new WhatsMeow integration, any connection whoseprovideris'whatsmeow'will hit this guard and bail out, soBindBridgeJobnever binds queues for it. That prevents the new provider from ever coming online.Please extend the allowed set to include
'whatsmeow'(or remove the guard if it is no longer necessary) so the bridge job runs for the WhatsMeow routes introduced in this upgrade.- if (config.provider && !['forwarder', 'baileys'].includes(config.provider!)) { + if (config.provider && !['forwarder', 'baileys', 'whatsmeow'].includes(config.provider!)) {src/broker.ts (1)
58-63: ReassesssendDefaultPii: truein Sentry.Phone numbers and message metadata are PII. Consider redaction via
beforeSendor disabling PII in production.Sentry.init({ dsn: process.env.SENTRY_DSN, - sendDefaultPii: true, + sendDefaultPii: false, + beforeSend(event) { + // redact known fields (example) + if (event.user) event.user = { id: event.user.id } + return event + }, })
🧹 Nitpick comments (7)
src/services/session_store.ts (1)
22-40: Prefer strict equality for status checks.Since
getStatusresolves to a string union, using===keeps comparisons idiomatic and guards against accidental coercion. Consider updating the set below (and similar helpers) when you next touch this code.- return (await this.getStatus(phone)) == 'online' + return (await this.getStatus(phone)) === 'online' ... - return (await this.getStatus(phone)) == 'connecting' + return (await this.getStatus(phone)) === 'connecting' ... - return (await this.getStatus(phone)) == 'offline' + return (await this.getStatus(phone)) === 'offline' ... - return (await this.getStatus(phone)) == 'disconnected' + return (await this.getStatus(phone)) === 'disconnected' ... - return (await this.getStatus(phone)) == 'restart_required' + return (await this.getStatus(phone)) === 'restart_required' ... - return (await this.getStatus(phone)) == 'standby' + return (await this.getStatus(phone)) === 'standby'src/services/media_store_s3.ts (1)
61-61: Keep the error object in the log entry
By interpolating onlyerror.message, we lose the stack trace and structured context pino provides when the error object is passed as the first argument. Please keep emitting the full error so investigators can see stacks in production logs.- logger.error(`Error on generate s3 signed url for bucket: ${bucket} file name: ${fileName} expires in: ${expiresIn} -> ${error.message}`) + logger.error(error, `Error on generate s3 signed url for bucket: ${bucket} file name: ${fileName} expires in: ${expiresIn}`)src/jobs/listener.ts (2)
20-20: Fix options type:priorityshould be numeric and optional.
priority: 0narrows the type to the literal 0. Make itpriority?: number(or remove it entirely—amqpConsume only passes count/max retries).- async consume(phone: string, data: object, options?: { countRetries: number; maxRetries: number; priority: 0 }) { + async consume(phone: string, data: object, options?: { countRetries: number; maxRetries: number; priority?: number }) {
52-53: Include message id in retry log for traceability.- logger.warn('Decrypt error message, try again...') + logger.warn('Decrypt error message %s, retrying...', (error as any)?.getMessageId?.() || '')src/broker.ts (1)
128-133: Clarify ownership of WhatsMeow queues/bindings to avoid split-brain declarations.You assert/bind
outgoing.whatsmeowhere whileprovider-whatsmeowmay manage its own resources. Pick one owner (adapter vs core) and document it.src/services/incoming_amqp.ts (2)
10-21: Deduplicate exchange/queue assertions; keep them in one place (broker or amqp helpers).Both this file and broker.ts assert
unoapi.outgoingandoutgoing.baileys. Centralize to reduce drift.
82-83: Throw an Error, not a string.Improves stack traces and Sentry grouping.
- throw `Unknown incoming message ${JSON.stringify(pl)}` + throw new Error(`Unknown incoming message ${JSON.stringify(pl)}`)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
yarn.lockis excluded by!**/yarn.lock,!**/*.lock
📒 Files selected for processing (75)
.gitignore(1 hunks).tool-versions(1 hunks)__tests__/jobs/timer.ts(2 hunks)__tests__/routes/blacklist.ts(1 hunks)__tests__/services/blacklist.ts(0 hunks)__tests__/services/media_store_file.ts(2 hunks)__tests__/services/outgoing_cloud_api.ts(3 hunks)__tests__/services/session_store_file.ts(2 hunks)__tests__/services/socket.ts(2 hunks)__tests__/services/transformer.ts(15 hunks)jest.config.js(1 hunks)package.json(2 hunks)public/index.html(9 hunks)src/app.ts(2 hunks)src/bridge.ts(2 hunks)src/broker.ts(4 hunks)src/controllers/index_controller.ts(0 hunks)src/controllers/marketing_messages_controller.ts(0 hunks)src/controllers/pairing_code_controller.ts(1 hunks)src/controllers/phone_number_controller.ts(2 hunks)src/controllers/registration_controller.ts(1 hunks)src/controllers/session_command_controller.ts(1 hunks)src/controllers/templates_controller.ts(0 hunks)src/controllers/webhook_controller.ts(1 hunks)src/controllers/webhook_fake_controller.ts(0 hunks)src/defaults.ts(5 hunks)src/i18n.ts(1 hunks)src/index.ts(2 hunks)src/jobs/bind_bridge.ts(2 hunks)src/jobs/bulk_parser.ts(1 hunks)src/jobs/bulk_report.ts(2 hunks)src/jobs/bulk_sender.ts(2 hunks)src/jobs/commander.ts(3 hunks)src/jobs/listener.ts(3 hunks)src/jobs/outgoing.ts(2 hunks)src/jobs/reload.ts(1 hunks)src/jobs/timer.ts(2 hunks)src/jobs/webhook_status_failed.ts(2 hunks)src/locales/en.json(1 hunks)src/locales/pt.json(1 hunks)src/locales/pt_BR.json(1 hunks)src/router.ts(3 hunks)src/services/auto_connect.ts(1 hunks)src/services/blacklist.ts(2 hunks)src/services/broadcast.ts(0 hunks)src/services/client.ts(3 hunks)src/services/client_baileys.ts(8 hunks)src/services/client_forward.ts(4 hunks)src/services/config.ts(3 hunks)src/services/config_by_env.ts(1 hunks)src/services/config_redis.ts(2 hunks)src/services/contact_baileys.ts(2 hunks)src/services/contact_dummy.ts(1 hunks)src/services/data_store.ts(1 hunks)src/services/data_store_redis.ts(1 hunks)src/services/incoming_amqp.ts(3 hunks)src/services/listener_amqp.ts(2 hunks)src/services/logout_amqp.ts(1 hunks)src/services/media_store_file.ts(3 hunks)src/services/media_store_s3.ts(4 hunks)src/services/outgoing_amqp.ts(1 hunks)src/services/outgoing_cloud_api.ts(1 hunks)src/services/redis.ts(4 hunks)src/services/reload_amqp.ts(1 hunks)src/services/reload_baileys.ts(1 hunks)src/services/send_error.ts(1 hunks)src/services/session_store.ts(2 hunks)src/services/session_store_redis.ts(5 hunks)src/services/socket.ts(15 hunks)src/services/store.ts(1 hunks)src/services/store_file.ts(1 hunks)src/services/timer.ts(1 hunks)src/services/transformer.ts(1 hunks)src/standalone.ts(9 hunks)src/web.ts(2 hunks)
💤 Files with no reviewable changes (6)
- src/controllers/webhook_fake_controller.ts
- src/controllers/marketing_messages_controller.ts
- src/controllers/templates_controller.ts
- tests/services/blacklist.ts
- src/controllers/index_controller.ts
- src/services/broadcast.ts
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2024-11-27T11:42:53.193Z
Learnt from: PP-Contrib
PR: clairton/unoapi-cloud#91
File: src/jobs/webhooker.ts:65-67
Timestamp: 2024-11-27T11:42:53.193Z
Learning: In `src/jobs/webhooker.ts`, error handling for single webhook HTTP requests in the `WebhookerJob` class is managed by RabbitMQ, so additional error handling in the `consume` method is unnecessary.
Applied to files:
src/jobs/bulk_parser.tssrc/jobs/bind_bridge.tssrc/jobs/webhook_status_failed.tssrc/standalone.ts
🧬 Code graph analysis (37)
src/jobs/bulk_parser.ts (3)
src/amqp.ts (4)
amqpPublish(210-261)_(66-66)payload(279-332)amqpChannel(114-122)src/defaults.ts (1)
UNOAPI_EXCHANGE_BROKER_NAME(81-81)src/services/broadcast_amqp.ts (2)
BroadcastAmqp(5-10)send(6-9)
src/services/logout_amqp.ts (3)
src/amqp.ts (1)
amqpPublish(210-261)src/defaults.ts (2)
UNOAPI_EXCHANGE_BRIDGE_NAME(82-82)UNOAPI_QUEUE_LOGOUT(99-99)src/services/broadcast_amqp.ts (1)
BroadcastAmqp(5-10)
src/bridge.ts (3)
src/amqp.ts (4)
amqpConsume(263-347)ConsumeCallback(84-86)payload(279-332)exchange(210-261)src/defaults.ts (5)
UNOAPI_EXCHANGE_BRIDGE_NAME(82-82)UNOAPI_QUEUE_BIND(92-92)UNOAPI_SERVER_NAME(69-69)UNOAPI_QUEUE_RELOAD(97-97)UNOAPI_QUEUE_LOGOUT(99-99)src/waker.ts (2)
queue(44-67)Promise(42-69)
__tests__/services/socket.ts (1)
src/services/config.ts (1)
defaultConfig(93-161)
src/index.ts (2)
src/app.ts (1)
App(21-101)src/defaults.ts (1)
BASE_URL(119-119)
src/services/session_store_redis.ts (2)
src/services/redis.ts (2)
getSessionStatus(223-226)redisGet(45-57)src/defaults.ts (1)
MAX_CONNECT_RETRY(115-115)
src/services/auto_connect.ts (2)
src/defaults.ts (1)
UNOAPI_SERVER_NAME(69-69)src/services/store.ts (1)
store(21-23)
src/services/outgoing_amqp.ts (2)
src/defaults.ts (1)
UNOAPI_QUEUE_OUTGOING(94-94)src/services/broadcast_amqp.ts (2)
BroadcastAmqp(5-10)send(6-9)
__tests__/routes/blacklist.ts (2)
src/services/blacklist.ts (1)
addToBlacklist(11-13)src/jobs/add_to_blacklist.ts (1)
addToBlacklist(4-8)
src/jobs/bulk_sender.ts (2)
src/defaults.ts (2)
UNOAPI_QUEUE_BULK_REPORT(102-102)UNOAPI_BULK_DELAY(114-114)src/amqp.ts (3)
payload(279-332)_(66-66)exchange(210-261)
src/router.ts (4)
src/controllers/session_command_controller.ts (2)
SessionCommandController(9-89)reload(54-69)src/services/config.ts (1)
getConfig(163-165)src/services/socket.ts (1)
logout(103-105)src/app.ts (1)
router(69-100)
src/jobs/listener.ts (2)
src/defaults.ts (3)
UNOAPI_SERVER_NAME(69-69)UNOAPI_EXCHANGE_BRIDGE_NAME(82-82)UNOAPI_QUEUE_LISTENER(89-89)src/amqp.ts (1)
amqpPublish(210-261)
src/jobs/outgoing.ts (2)
src/services/store.ts (1)
store(21-23)src/services/transformer.ts (3)
isUpdateMessage(479-482)TYPE_MESSAGES_MEDIA(12-12)jidToPhoneNumber(518-535)
src/services/session_store.ts (1)
src/defaults.ts (1)
MAX_CONNECT_RETRY(115-115)
src/services/media_store_file.ts (1)
src/defaults.ts (1)
FETCH_TIMEOUT_MS(44-44)
src/jobs/commander.ts (2)
src/defaults.ts (4)
UNOAPI_EXCHANGE_BROKER_NAME(81-81)UNOAPI_QUEUE_BULK_PARSER(96-96)UNOAPI_QUEUE_RELOAD(97-97)UNOAPI_QUEUE_BULK_REPORT(102-102)src/amqp.ts (1)
amqpPublish(210-261)
src/services/timer.ts (3)
src/services/redis.ts (1)
setLastTimer(451-455)src/amqp.ts (1)
amqpPublish(210-261)src/controllers/timer_controller.ts (1)
stop(6-14)
__tests__/services/media_store_file.ts (2)
src/services/media_store_file.ts (1)
getMediaStoreFile(18-27)src/services/config.ts (1)
defaultConfig(93-161)
src/web.ts (1)
src/defaults.ts (3)
PORT(118-118)CONFIG_SESSION_PHONE_CLIENT(162-162)CONFIG_SESSION_PHONE_NAME(163-163)
src/controllers/session_command_controller.ts (5)
src/services/config.ts (1)
getConfig(163-165)src/services/redis.ts (1)
getConfig(282-289)src/services/reload.ts (1)
Reload(8-17)src/services/logout.ts (1)
Logout(1-3)src/defaults.ts (1)
WHATSOMEOW_ADAPTER_BASEURL(37-38)
src/jobs/bind_bridge.ts (2)
src/defaults.ts (4)
UNOAPI_QUEUE_LISTENER(89-89)UNOAPI_SERVER_NAME(69-69)UNOAPI_EXCHANGE_BRIDGE_NAME(82-82)UNOAPI_QUEUE_INCOMING(105-105)src/amqp.ts (1)
amqpConsume(263-347)
src/services/blacklist.ts (1)
src/services/redis.ts (1)
redisTtl(59-71)
src/services/listener_amqp.ts (3)
src/services/listener.ts (2)
Listener(3-5)eventType(1-1)src/amqp.ts (2)
PublishOption(78-82)amqpPublish(210-261)src/defaults.ts (3)
UNOAPI_EXCHANGE_BRIDGE_NAME(82-82)UNOAPI_QUEUE_LISTENER(89-89)UNOAPI_SERVER_NAME(69-69)
src/services/contact_dummy.ts (2)
src/services/client.ts (1)
Contact(10-14)src/services/contact.ts (2)
Contact(7-9)ContactResponse(3-5)
src/services/data_store_redis.ts (1)
src/defaults.ts (1)
ONLY_HELLO_TEMPLATE(168-168)
src/defaults.ts (1)
src/services/listener_amqp.ts (1)
process(39-45)
src/services/reload_amqp.ts (2)
src/amqp.ts (1)
amqpPublish(210-261)src/defaults.ts (3)
UNOAPI_EXCHANGE_BROKER_NAME(81-81)UNOAPI_QUEUE_RELOAD(97-97)UNOAPI_EXCHANGE_BRIDGE_NAME(82-82)
src/services/media_store_s3.ts (2)
src/amqp.ts (1)
amqpPublish(210-261)src/defaults.ts (4)
UNOAPI_EXCHANGE_BROKER_NAME(81-81)UNOAPI_QUEUE_MEDIA(87-87)DATA_TTL(75-75)FETCH_TIMEOUT_MS(44-44)
src/broker.ts (6)
src/amqp.ts (3)
amqpConsume(263-347)amqpGetChannel(114-122)extractRoutingKeyFromBindingKey(56-59)src/services/listener_amqp.ts (1)
ListenerAmqp(38-46)src/services/on_new_login_generate_token.ts (1)
onNewLoginGenerateToken(8-33)src/services/incoming_baileys.ts (1)
IncomingBaileys(8-31)src/services/client_baileys.ts (1)
getClientBaileys(170-204)src/jobs/incoming.ts (1)
IncomingJob(12-189)
src/services/incoming_amqp.ts (2)
src/amqp.ts (1)
amqpGetChannel(114-122)src/services/transformer.ts (2)
getGroupId(435-450)jidToPhoneNumber(518-535)
src/services/socket.ts (4)
src/i18n.ts (1)
t(15-18)src/services/client_baileys.ts (2)
logout(536-540)connect(380-432)src/services/transformer.ts (1)
isIndividualJid(333-337)src/defaults.ts (1)
LOG_LEVEL(14-14)
src/services/client_baileys.ts (3)
src/services/send_error.ts (1)
SendError(1-9)src/i18n.ts (1)
t(15-18)src/defaults.ts (1)
FETCH_TIMEOUT_MS(44-44)
src/standalone.ts (7)
src/services/redis.ts (1)
startRedis(12-31)src/amqp.ts (4)
amqpConnect(88-107)amqpConsume(263-347)amqpGetChannel(114-122)extractRoutingKeyFromBindingKey(56-59)src/defaults.ts (9)
UNOAPI_EXCHANGE_BRIDGE_NAME(82-82)UNOAPI_QUEUE_BIND(92-92)UNOAPI_SERVER_NAME(69-69)UNOAPI_QUEUE_RELOAD(97-97)UNOAPI_QUEUE_LOGOUT(99-99)UNOAPI_EXCHANGE_BROKER_NAME(81-81)UNOAPI_QUEUE_OUTGOING(94-94)UNOAPI_QUEUE_NOTIFICATION(88-88)UNOAPI_QUEUE_BLACKLIST_ADD(90-90)src/jobs/notification.ts (1)
NotificationJob(3-18)src/services/incoming_baileys.ts (1)
IncomingBaileys(8-31)src/services/client_baileys.ts (1)
getClientBaileys(170-204)src/jobs/incoming.ts (1)
IncomingJob(12-189)
src/jobs/bulk_report.ts (2)
src/amqp.ts (1)
amqpPublish(210-261)src/defaults.ts (3)
UNOAPI_EXCHANGE_BROKER_NAME(81-81)UNOAPI_QUEUE_BULK_REPORT(102-102)UNOAPI_BULK_DELAY(114-114)
src/services/client.ts (3)
src/services/listener.ts (1)
Listener(3-5)src/services/config.ts (1)
getConfig(163-165)src/services/socket.ts (1)
OnNewLogin(71-71)
src/services/store.ts (1)
src/services/data_store.ts (1)
DataStore(25-56)
src/services/redis.ts (3)
src/services/config.ts (1)
Webhook(16-31)src/services/session_store.ts (1)
getConnectCount(42-44)src/services/session_store_redis.ts (2)
getConnectCount(56-58)clearConnectCount(64-68)
🪛 Gitleaks (8.28.0)
__tests__/services/transformer.ts
[high] 1580-1580: Detected a Generic API Key, potentially exposing access to various services and sensitive operations.
(generic-api-key)
🔇 Additional comments (43)
src/services/outgoing_cloud_api.ts (1)
66-67: No change in behavior—looks good.The header definition remains valid and continues to set the correct content type. No issues spotted.
src/services/transformer.ts (1)
977-983: No functional deltas—logic still solid.The loop-based copy into a freshly allocated
Bufferremains intact; only style tweaks landed. No concerns here.package.json (1)
90-90: Confirm supply‑chain vetting on the new Baileys fork.Switching
baileysto the freshly publishednpm:whaileys@6.2.5means we now depend on a brand-new package maintained by a single publisher (released roughly six hours ago and created within the last two weeks). Given the active npm supply-chain attacks disclosed earlier this month, please double-check the provenance of this fork (audit the tarball, verify signatures/maintainers, and document the due diligence) before we ship.(socket.dev)src/services/config_by_env.ts (1)
125-132: Safe style tidy ongetRandomChar.Removing the trailing semicolon keeps the helper intact—no behavior change here. 👍
src/services/data_store_redis.ts (1)
141-215: Spacing tweak is fine.Whitespace-only update; template selection logic remains unchanged.
src/services/outgoing_amqp.ts (1)
21-27: Formatting keeps publish semantics intact.Argument layout is clearer while preserving the outgoing AMQP payload.
src/services/session_store_redis.ts (1)
45-105: Await-parentheses cleanup looks good.The added grouping clarifies intent without altering session reconnection behavior.
.gitignore (1)
10-11: Good call adding.idea.Helps keep local IDE artifacts out of source control.
src/jobs/commander.ts (3)
42-54: Multi-line publish call stays clear.This layout keeps the payload/options easy to scan without altering behavior. 👍
78-78: Inline publish call still reads cleanly.No behavior change and the single-line usage remains straightforward.
91-96: Payload formatting remains tidy.The expanded layout keeps the arguments readable while preserving the original logic.
src/services/logout_amqp.ts (1)
13-16: Formatting change looks good.Structure is untouched and the AMQP publish parameters stay the same.
src/jobs/webhook_status_failed.ts (2)
17-17: Nice addition of explicit charset.Clarifies the payload encoding without changing logic.
21-24: Re-throwing the fetch error is the right move.This lets the caller/RabbitMQ decide on retries instead of silently swallowing failures.
__tests__/services/outgoing_cloud_api.ts (3)
52-54: Trailing comma keeps diffs tidy.No functional impact; helps future edits.
66-67: Array formatting update is fine.Readability is unchanged and behavior stays the same.
80-81: Status array formatting looks good.Purely cosmetic and keeps things consistent with the rest of the file.
src/services/contact_baileys.ts (1)
32-33: Trailing comma change looks good.No behavioral impact here—object literal remains valid and readably formatted.
__tests__/jobs/timer.ts (1)
20-38: Formatting tidy-up appreciated.The expanded object/array layout improves readability without altering test behavior.
src/jobs/bulk_report.ts (1)
32-34: Readable retry messaging
Thanks for expanding the retry notification into a multi-line literal—much easier to scan, and no behavior change.src/jobs/bulk_sender.ts (1)
73-78: Bulk report handoff looks solid
Confirming the final-report publish now flows through the broker exchange with the same routing delay configuration the consumer expects—nice consistency touch.src/locales/pt_BR.json (1)
15-33: Locale update aligns with new status copy
Placeholders and wording forconnected_with_latest/deleted_messagemirror the new flow; Brazilian Portuguese strings look consistent with the EN source.__tests__/services/media_store_file.ts (1)
41-59: Great coverage for mime-type fallbacks
Appreciate the new tests around missing payloads and filename-derived MIME types—this nails the edge cases introduced in the store changes.__tests__/services/session_store_file.ts (1)
54-55: Clearer precedence for awaited boolean checkAdding the parentheses keeps the
awaitgrouped before the double negation, so the coercion happens on the resolved value rather than the pending Promise. Nice tidy-up.src/services/media_store_file.ts (1)
190-216: Good defensive guard around missing media metadataAppreciate the early return with a warning when the payload is absent and the extra branch to rebuild the path plus normalize
mime_type. That should save the caller from mysterious 500s when stored metadata is incomplete.src/services/send_error.ts (1)
1-9: No issues spotted inSendError.Formatting tweak keeps the class intact—nothing further needed here.
src/app.ts (1)
42-66: Express/socket wiring unchanged.Just formatting around the constructor/router calls—logic remains solid.
src/services/reload_amqp.ts (1)
14-18: AMQP publish calls look good.Inline formatting doesn’t alter behavior; message routing stays the same.
src/controllers/webhook_controller.ts (1)
34-40: Webhook verify flow unchanged.Spacing tweak keeps the config lookup and token check intact—no issues.
src/locales/pt.json (1)
9-33: No removed locale keys detected. All referenced keys (attempts_exceeded,auto_restart,on_read_qrcode) remain present in the locale files and are correctly used in the code.Likely an incorrect or invalid review comment.
__tests__/services/socket.ts (1)
52-52: Explicit whatsappVersion in the fixture looks good.
Locking the version in the test payload keeps the connect contract deterministic as the provider mix broadens.Also applies to: 68-68
__tests__/routes/blacklist.ts (1)
27-28: Blacklist payload assertion still targets the right contract.
The refactored literal continues to exercise TTL and destination arguments exactly as the handler expects.src/bridge.ts (1)
50-66: Direct bridge consumers remain well-scoped.
Keeping the queue name suffixed by server and passingtype: 'direct'preserves the per-instance routing you need for the bridge workers.src/web.ts (2)
11-19: Imports extension looks good.New defaults (BASE_URL/queues/exchange) correctly imported for use below.
61-65: Bind RELOAD consumer toReloadJob.consume, notReload.run
Usingreload.run.bind(reloadJob)attaches the service method to the job instance, butReloadJobexposesconsume, notrun; replace withreloadJob.consume.bind(reloadJob)Likely an incorrect or invalid review comment.
src/router.ts (2)
23-23: Controller import OK.New SessionCommandController import aligns with provider-specific session ops.
64-64: Instantiation OK.Dependency injection matches constructor signature (getConfig, reload, logout).
src/services/auto_connect.ts (2)
23-26: Provider allowlist extended correctly.Including 'whatsmeow' ensures recognized providers aren’t skipped during auto-connect.
39-43: Remove unnecessary sessionStore rename
DestructuringsessionStorefromstoredoes not shadow any existing variable—there is no outersessionStoreparameter, and this naming is consistently used across multiple files (reload_baileys.ts, session_controller.ts, phone_number_controller.ts, bind_bridge.ts).Likely an incorrect or invalid review comment.
src/services/config.ts (2)
76-81: Provider type updated to include 'whatsmeow' — good.Type now reflects supported providers across the codebase.
146-146: Default 'baileys' provider alters branch logic in jobs
Consumers like listener.ts useif (config.provider !== 'baileys')—undefined previously skipped but now defaults to'baileys', causing legacy entries to be processed. Backfill/migrate existing records or adjust logic to prevent unintended processing.src/defaults.ts (2)
165-165: Casting WHATSAPP_VERSION to WAVersion is OK.Makes typing explicit without changing runtime behavior.
81-81: Manual Verification Required: Confirm Broker Exchange Declaration TypeNo
assertExchangecall found forUNOAPI_EXCHANGE_BROKER_NAMEin the codebase; please verify in your infrastructure configuration that this exchange is provisioned with type'topic'.
__tests__/services/transformer.ts
Outdated
| // {"key":{"remoteJid":"555533800800@s.whatsapp.net","fromMe":false,"id":"1BE283407E62E5A073"},"messageTimestamp":1753900800,"pushName":"555533800800","broadcast":false,"message":{"messageContextInfo":{"deviceListMetadata":{"recipientKeyHash":"BuoOcp2GlUsdsQ==","recipientTimestamp":"1753278139","recipientKeyIndexes":[0,5]},"deviceListMetadataVersion":2},"buttonsMessage":{"contentText":"Para confirmar, estou falando com *IM Agronegócios* e o seu CNPJ é *41.281.5xx/xxxx-xx*?","buttons":[{"buttonId":"1","buttonText":{"displayText":"Sim"},"type":"RESPONSE"},{"buttonId":"2","buttonText":{"displayText":"Não"},"type":"RESPONSE"}],"headerType":"EMPTY"}},"verifiedBizName":"Unifique"} | ||
| // {"key":{"remoteJid":"555533800800@s.whatsapp.net","fromMe":true,"id":"3EB02FCD7C12A71F06DE34"}, "messageTimestamp":1753900805,"pushName":"Im Agronegócios","broadcast":false,"status":2, "message":{"buttonsResponseMessage":{"selectedButtonId":"1","selectedDisplayText":"Sim","contextInfo":{"stanzaId":"1BE283407E62E5A073","participant":"555533800800@s.whatsapp.net","quotedMessage":{"messageContextInfo":{},"buttonsMessage":{"contentText":"Para confirmar, estou falando com *IM Agronegócios* e o seu CNPJ é *41.281.5xx/xxxx-xx*?","buttons":[{"buttonId":"1","buttonText":{"displayText":"Sim"},"type":"RESPONSE"},{"buttonId":"2","buttonText":{"displayText":"Não"},"type":"RESPONSE"}],"headerType":"EMPTY"}}},"type":"DISPLAY_TEXT"}}} | ||
| }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove raw payload from test comment.
The new inline JSON contains live-looking phone numbers, CNPJ fragments, and a recipientKeyHash, which triggers gitleaks and risks leaking sensitive customer data. Please remove it or replace it with a fully sanitized fixture so we keep the repo free of regulated data. Based on static analysis.
🧰 Tools
🪛 Gitleaks (8.28.0)
[high] 1580-1580: Detected a Generic API Key, potentially exposing access to various services and sensitive operations.
(generic-api-key)
🤖 Prompt for AI Agents
In __tests__/services/transformer.ts around lines 1580 to 1582, the inline JSON
test comment contains live-looking phone numbers, CNPJ fragments and a
recipientKeyHash that can trigger gitleaks; remove the raw payload and replace
it with a sanitized fixture or dummy data (e.g., replace phone numbers with
000000000000@s.whatsapp.net, CNPJ with 00.000.000/0000-00,
recipientKeyHash/stanzaId/participant/id with placeholder strings) or reference
a separate sanitized fixture file; ensure no real identifiers remain in the
comment before committing.
| const channel = await amqpGetChannel() | ||
| await channel?.assertExchange('unoapi.outgoing', 'topic', { durable: true }) | ||
| await channel?.assertQueue('outgoing.baileys', { durable: true }) | ||
| await channel?.bindQueue('outgoing.baileys', 'unoapi.outgoing', 'provider.baileys.*') | ||
|
|
||
| // Ensure Whatsmeow queues exist too (created but not consumed here) | ||
| await channel?.assertQueue('outgoing.baileys.dlq', { durable: true }) | ||
| await channel?.assertQueue('outgoing.whatsmeow', { durable: true }) | ||
| await channel?.bindQueue('outgoing.whatsmeow', 'unoapi.outgoing', 'provider.whatsmeow.*') | ||
| await channel?.assertQueue('outgoing.whatsmeow.dlq', { durable: true }) | ||
| const listenerAmqpWorker = new ListenerAmqp() | ||
| const onNewLogin = onNewLoginGenerateToken(outgoingCloudApi) | ||
| const incomingBaileysWorker = new IncomingBaileys(listenerAmqpWorker, getConfigRedis, getClientBaileys, onNewLogin) | ||
| const providerJob = new IncomingJob(incomingBaileysWorker, outgoingAmqp, getConfigRedis) | ||
| channel?.consume('outgoing.baileys', async (payload) => { | ||
| if (!payload) { | ||
| return | ||
| } | ||
| const phone = extractRoutingKeyFromBindingKey(payload.fields.routingKey) | ||
| const data = JSON.parse(payload.content.toString()) | ||
| try { | ||
| await providerJob.consume(phone, data) | ||
| } catch (error) { | ||
| logger.error(error, 'Error consuming provider.baileys message') | ||
| } | ||
| channel.ack(payload) | ||
| }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use amqpConsume for provider queue to retain retries, delays, DLQ, timeouts, and prefetch.
Direct channel.consume lacks retry/backoff, timeout, DLQ routing, and priority handling; it also acks even on error.
- const channel = await amqpGetChannel()
- await channel?.assertExchange('unoapi.outgoing', 'topic', { durable: true })
- await channel?.assertQueue('outgoing.baileys', { durable: true })
- await channel?.bindQueue('outgoing.baileys', 'unoapi.outgoing', 'provider.baileys.*')
-
- // Ensure Whatsmeow queues exist too (created but not consumed here)
- await channel?.assertQueue('outgoing.baileys.dlq', { durable: true })
- await channel?.assertQueue('outgoing.whatsmeow', { durable: true })
- await channel?.bindQueue('outgoing.whatsmeow', 'unoapi.outgoing', 'provider.whatsmeow.*')
- await channel?.assertQueue('outgoing.whatsmeow.dlq', { durable: true })
- const listenerAmqpWorker = new ListenerAmqp()
+ const channel = await amqpGetChannel()
+ await channel?.assertExchange('unoapi.outgoing', 'topic', { durable: true })
+ const listenerAmqpWorker = new ListenerAmqp()
const onNewLogin = onNewLoginGenerateToken(outgoingCloudApi)
const incomingBaileysWorker = new IncomingBaileys(listenerAmqpWorker, getConfigRedis, getClientBaileys, onNewLogin)
const providerJob = new IncomingJob(incomingBaileysWorker, outgoingAmqp, getConfigRedis)
- channel?.consume('outgoing.baileys', async (payload) => {
- if (!payload) {
- return
- }
- const phone = extractRoutingKeyFromBindingKey(payload.fields.routingKey)
- const data = JSON.parse(payload.content.toString())
- try {
- await providerJob.consume(phone, data)
- } catch (error) {
- logger.error(error, 'Error consuming provider.baileys message')
- }
- channel.ack(payload)
- })
+ await amqpConsume('unoapi.outgoing', 'outgoing.baileys', 'provider.baileys.*', providerJob.consume.bind(providerJob), {
+ type: 'topic',
+ prefetch,
+ notifyFailedMessages,
+ })📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const channel = await amqpGetChannel() | |
| await channel?.assertExchange('unoapi.outgoing', 'topic', { durable: true }) | |
| await channel?.assertQueue('outgoing.baileys', { durable: true }) | |
| await channel?.bindQueue('outgoing.baileys', 'unoapi.outgoing', 'provider.baileys.*') | |
| // Ensure Whatsmeow queues exist too (created but not consumed here) | |
| await channel?.assertQueue('outgoing.baileys.dlq', { durable: true }) | |
| await channel?.assertQueue('outgoing.whatsmeow', { durable: true }) | |
| await channel?.bindQueue('outgoing.whatsmeow', 'unoapi.outgoing', 'provider.whatsmeow.*') | |
| await channel?.assertQueue('outgoing.whatsmeow.dlq', { durable: true }) | |
| const listenerAmqpWorker = new ListenerAmqp() | |
| const onNewLogin = onNewLoginGenerateToken(outgoingCloudApi) | |
| const incomingBaileysWorker = new IncomingBaileys(listenerAmqpWorker, getConfigRedis, getClientBaileys, onNewLogin) | |
| const providerJob = new IncomingJob(incomingBaileysWorker, outgoingAmqp, getConfigRedis) | |
| channel?.consume('outgoing.baileys', async (payload) => { | |
| if (!payload) { | |
| return | |
| } | |
| const phone = extractRoutingKeyFromBindingKey(payload.fields.routingKey) | |
| const data = JSON.parse(payload.content.toString()) | |
| try { | |
| await providerJob.consume(phone, data) | |
| } catch (error) { | |
| logger.error(error, 'Error consuming provider.baileys message') | |
| } | |
| channel.ack(payload) | |
| }) | |
| const channel = await amqpGetChannel() | |
| await channel?.assertExchange('unoapi.outgoing', 'topic', { durable: true }) | |
| const listenerAmqpWorker = new ListenerAmqp() | |
| const onNewLogin = onNewLoginGenerateToken(outgoingCloudApi) | |
| const incomingBaileysWorker = new IncomingBaileys( | |
| listenerAmqpWorker, | |
| getConfigRedis, | |
| getClientBaileys, | |
| onNewLogin | |
| ) | |
| const providerJob = new IncomingJob( | |
| incomingBaileysWorker, | |
| outgoingAmqp, | |
| getConfigRedis | |
| ) | |
| await amqpConsume( | |
| 'unoapi.outgoing', | |
| 'outgoing.baileys', | |
| 'provider.baileys.*', | |
| providerJob.consume.bind(providerJob), | |
| { | |
| type: 'topic', | |
| prefetch, | |
| notifyFailedMessages, | |
| } | |
| ) |
🤖 Prompt for AI Agents
In src/broker.ts around lines 123 to 149, replace the direct channel.consume
call with the project's amqpConsume helper so the consumer benefits from
retries/backoff, DLQ routing, timeouts and prefetch; move the routing-key
extraction and JSON.parse inside the amqpConsume handler, call
providerJob.consume and only ack on success (do not ack on exceptions), and pass
appropriate amqpConsume options (prefetch, timeout, dlq/enabled flags) so errors
are retried/forwarded to the DLQ rather than being acknowledged immediately.
| } else if (IGNORE_OWN_MESSAGES_DECRYPT_ERROR && isOutgoingMessage(error.getContent())) { | ||
| logger.warn('Ignore decrypt error for own message %s', error.getMessageId()) | ||
| logger.debug('Ignore decrypt error for own message %s', error.getMessageId()) | ||
| return | ||
| } else if (options && options?.countRetries >= options?.maxRetries) { | ||
| logger.warn('Decryption error overread max retries message %s', error.getMessageId()) | ||
| // send message asking to open whatsapp to see | ||
| return this.outgoing.send(phone, error.getContent()) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid resubmitting the original (undecryptable) content; send a safe fallback instead.
Forwarding error.getContent() risks loops and repeated failures. Send a clear text notice (or route to a DLQ) when retries are exhausted.
- } else if (options && options?.countRetries >= options?.maxRetries) {
- logger.warn('Decryption error overread max retries message %s', error.getMessageId())
- // send message asking to open whatsapp to see
- return this.outgoing.send(phone, error.getContent())
+ } else if (options && options?.countRetries >= options?.maxRetries) {
+ logger.warn('Decryption error exceeded max retries for message %s', error.getMessageId())
+ // Send safe fallback to avoid resubmitting the problematic payload
+ const fallback = {
+ type: 'text',
+ text: { body: 'We could not decrypt a message. Please open WhatsApp on your device and try again.' },
+ }
+ return this.outgoing.send(phone, fallback)
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| } else if (IGNORE_OWN_MESSAGES_DECRYPT_ERROR && isOutgoingMessage(error.getContent())) { | |
| logger.warn('Ignore decrypt error for own message %s', error.getMessageId()) | |
| logger.debug('Ignore decrypt error for own message %s', error.getMessageId()) | |
| return | |
| } else if (options && options?.countRetries >= options?.maxRetries) { | |
| logger.warn('Decryption error overread max retries message %s', error.getMessageId()) | |
| // send message asking to open whatsapp to see | |
| return this.outgoing.send(phone, error.getContent()) | |
| } | |
| } else if (IGNORE_OWN_MESSAGES_DECRYPT_ERROR && isOutgoingMessage(error.getContent())) { | |
| logger.debug('Ignore decrypt error for own message %s', error.getMessageId()) | |
| return | |
| } else if (options && options?.countRetries >= options?.maxRetries) { | |
| logger.warn('Decryption error exceeded max retries for message %s', error.getMessageId()) | |
| // Send safe fallback to avoid resubmitting the problematic payload | |
| const fallback = { | |
| type: 'text', | |
| text: { body: 'We could not decrypt a message. Please open WhatsApp on your device and try again.' }, | |
| } | |
| return this.outgoing.send(phone, fallback) | |
| } |
| if (typeof mediaKey === 'string') { | ||
| // assume base64 | ||
| try { | ||
| mediaKey = Buffer.from(mediaKey, 'base64') | ||
| } catch (_) { /* ignore conversion errors */ } | ||
| } else if (typeof mediaKey === 'object' && mediaKey !== null) { | ||
| try { | ||
| mediaKey = Uint8Array.from(Object.values(mediaKey)) | ||
| } catch (_) { /* ignore conversion errors */ } | ||
| } | ||
| const mapMediaType = { | ||
| image: 'image', | ||
| video: 'video', | ||
| document: 'document', | ||
| sticker: 'sticker', | ||
| audio: 'audio', | ||
| } as const | ||
| try { | ||
| const stream = await downloadContentFromMessage( | ||
| { | ||
| mediaKey, | ||
| directPath, | ||
| url, | ||
| } as any, | ||
| mapMediaType[message.type], | ||
| {}, | ||
| ) | ||
| const chunks: Buffer[] = [] | ||
| for await (const chunk of stream) { | ||
| chunks.push(chunk as Buffer) | ||
| } | ||
| const buffer = Buffer.concat(chunks) | ||
| const mimetype: string = media.mime_type || media.mimetype || (media.filename ? (mime.lookup(media.filename) as string) : 'application/octet-stream') | ||
| const filePath = mediaStore.getFilePath(phone, message.id, mimetype) | ||
| await mediaStore.saveMediaBuffer(filePath, buffer) | ||
| const mediaId = `${phone}/${message.id}` | ||
| const payloadMedia = { | ||
| messaging_product: 'whatsapp', | ||
| mime_type: mimetype, | ||
| sha256: media.fileSha256 || media.sha256, | ||
| file_size: media.fileLength || media.file_size, | ||
| id: mediaId, | ||
| filename: media.filename || filePath, | ||
| } | ||
| await dataStore.setMediaPayload(message.id, payloadMedia) | ||
| message[message.type].id = mediaId | ||
| } catch (err) { | ||
| // If download/decrypt fails, leave message as-is | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix mediaKey normalization before decrypting.
When media.media_key arrives as a Buffer JSON object (e.g., { type: 'Buffer', data: [...] }), Uint8Array.from(Object.values(mediaKey)) produces [0,0], so downloadContentFromMessage runs with the wrong key and the WhatsMeow media download fails. Guard against the serialized Buffer shape (and other array-like forms) by pulling mediaKey.data (or accepting plain arrays) instead of Object.values, and bail if you still can't build a byte array. Example patch:
- } else if (typeof mediaKey === 'object' && mediaKey !== null) {
- try {
- mediaKey = Uint8Array.from(Object.values(mediaKey))
- } catch (_) { /* ignore conversion errors */ }
+ } else if (typeof mediaKey === 'object' && mediaKey !== null) {
+ const candidate =
+ Array.isArray(mediaKey) ? mediaKey :
+ Array.isArray(mediaKey.data) ? mediaKey.data :
+ undefined
+ if (candidate) {
+ mediaKey = Uint8Array.from(candidate)
+ } else {
+ return message
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if (typeof mediaKey === 'string') { | |
| // assume base64 | |
| try { | |
| mediaKey = Buffer.from(mediaKey, 'base64') | |
| } catch (_) { /* ignore conversion errors */ } | |
| } else if (typeof mediaKey === 'object' && mediaKey !== null) { | |
| try { | |
| mediaKey = Uint8Array.from(Object.values(mediaKey)) | |
| } catch (_) { /* ignore conversion errors */ } | |
| } | |
| const mapMediaType = { | |
| image: 'image', | |
| video: 'video', | |
| document: 'document', | |
| sticker: 'sticker', | |
| audio: 'audio', | |
| } as const | |
| try { | |
| const stream = await downloadContentFromMessage( | |
| { | |
| mediaKey, | |
| directPath, | |
| url, | |
| } as any, | |
| mapMediaType[message.type], | |
| {}, | |
| ) | |
| const chunks: Buffer[] = [] | |
| for await (const chunk of stream) { | |
| chunks.push(chunk as Buffer) | |
| } | |
| const buffer = Buffer.concat(chunks) | |
| const mimetype: string = media.mime_type || media.mimetype || (media.filename ? (mime.lookup(media.filename) as string) : 'application/octet-stream') | |
| const filePath = mediaStore.getFilePath(phone, message.id, mimetype) | |
| await mediaStore.saveMediaBuffer(filePath, buffer) | |
| const mediaId = `${phone}/${message.id}` | |
| const payloadMedia = { | |
| messaging_product: 'whatsapp', | |
| mime_type: mimetype, | |
| sha256: media.fileSha256 || media.sha256, | |
| file_size: media.fileLength || media.file_size, | |
| id: mediaId, | |
| filename: media.filename || filePath, | |
| } | |
| await dataStore.setMediaPayload(message.id, payloadMedia) | |
| message[message.type].id = mediaId | |
| } catch (err) { | |
| // If download/decrypt fails, leave message as-is | |
| } | |
| if (typeof mediaKey === 'string') { | |
| // assume base64 | |
| try { | |
| mediaKey = Buffer.from(mediaKey, 'base64') | |
| } catch (_) { /* ignore conversion errors */ } | |
| } else if (typeof mediaKey === 'object' && mediaKey !== null) { | |
| const candidate = | |
| Array.isArray(mediaKey) ? mediaKey : | |
| Array.isArray((mediaKey as any).data) ? (mediaKey as any).data : | |
| undefined | |
| if (candidate) { | |
| mediaKey = Uint8Array.from(candidate) | |
| } else { | |
| return message | |
| } | |
| } | |
| const mapMediaType = { | |
| image: 'image', | |
| video: 'video', | |
| document: 'document', | |
| sticker: 'sticker', | |
| audio: 'audio', | |
| } as const | |
| try { | |
| const stream = await downloadContentFromMessage( | |
| { | |
| mediaKey, | |
| directPath, | |
| url, | |
| } as any, | |
| mapMediaType[message.type], | |
| {}, | |
| ) | |
| const chunks: Buffer[] = [] | |
| for await (const chunk of stream) { | |
| chunks.push(chunk as Buffer) | |
| } | |
| const buffer = Buffer.concat(chunks) | |
| const mimetype: string = media.mime_type || media.mimetype || (media.filename ? (mime.lookup(media.filename) as string) : 'application/octet-stream') | |
| const filePath = mediaStore.getFilePath(phone, message.id, mimetype) | |
| await mediaStore.saveMediaBuffer(filePath, buffer) | |
| const mediaId = `${phone}/${message.id}` | |
| const payloadMedia = { | |
| messaging_product: 'whatsapp', | |
| mime_type: mimetype, | |
| sha256: media.fileSha256 || media.sha256, | |
| file_size: media.fileLength || media.file_size, | |
| id: mediaId, | |
| filename: media.filename || filePath, | |
| } | |
| await dataStore.setMediaPayload(message.id, payloadMedia) | |
| message[message.type].id = mediaId | |
| } catch (err) { | |
| // If download/decrypt fails, leave message as-is | |
| } |
| const channel = await amqpGetChannel() | ||
| const routingKey = `provider.${provider}.${phone}` | ||
| if (status) { | ||
| options['type'] = 'direct' | ||
| options['priority'] = 3 // update status is always middle important | ||
| await amqpPublish( | ||
| UNOAPI_EXCHANGE_BRIDGE_NAME, | ||
| `${UNOAPI_QUEUE_INCOMING}.${config.server!}`, | ||
| phone, | ||
| { payload, options }, | ||
| options | ||
| ) | ||
| const data = { payload: pl, options } | ||
| channel?.publish(EXCHANGE, routingKey, Buffer.from(JSON.stringify(data)), { | ||
| contentType: 'application/json', | ||
| messageId: (payload as any).message_id, | ||
| persistent: true, | ||
| }) | ||
| return { ok: { success: true } } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Publish via amqpPublish to keep retries, priorities, and DLQ; current channel.publish drops these guarantees.
Also, options.type = 'direct' here has no broker effect; the exchange is topic.
- const channel = await amqpGetChannel()
- const routingKey = `provider.${provider}.${phone}`
+ const routingKey = `provider.${provider}.${phone}`
if (status) {
- options['type'] = 'direct'
- options['priority'] = 3 // update status is always middle important
- const data = { payload: pl, options }
- channel?.publish(EXCHANGE, routingKey, Buffer.from(JSON.stringify(data)), {
- contentType: 'application/json',
- messageId: (payload as any).message_id,
- persistent: true,
- })
+ const data = { payload: pl, options }
+ await amqpPublish(EXCHANGE, 'outgoing.baileys', routingKey, data, { type: 'topic', priority: 3 })
return { ok: { success: true } }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const channel = await amqpGetChannel() | |
| const routingKey = `provider.${provider}.${phone}` | |
| if (status) { | |
| options['type'] = 'direct' | |
| options['priority'] = 3 // update status is always middle important | |
| await amqpPublish( | |
| UNOAPI_EXCHANGE_BRIDGE_NAME, | |
| `${UNOAPI_QUEUE_INCOMING}.${config.server!}`, | |
| phone, | |
| { payload, options }, | |
| options | |
| ) | |
| const data = { payload: pl, options } | |
| channel?.publish(EXCHANGE, routingKey, Buffer.from(JSON.stringify(data)), { | |
| contentType: 'application/json', | |
| messageId: (payload as any).message_id, | |
| persistent: true, | |
| }) | |
| return { ok: { success: true } } | |
| // … somewhere above, remove/amqpGetChannel is no longer needed here | |
| const routingKey = `provider.${provider}.${phone}` | |
| if (status) { | |
| const data = { payload: pl, options } | |
| await amqpPublish( | |
| EXCHANGE, | |
| 'outgoing.baileys', | |
| routingKey, | |
| data, | |
| { type: 'topic', priority: 3 } | |
| ) | |
| return { ok: { success: true } } | |
| } |
🤖 Prompt for AI Agents
In src/services/incoming_amqp.ts around lines 44 to 55, the code uses
channel.publish directly and mutates options.type='direct' which has no effect
for a topic exchange; replace the direct channel.publish call with the existing
amqpPublish helper (so the message benefits from retries, priority handling and
DLQ behavior), remove setting options.type='direct' (since exchange is topic),
ensure options.priority is set (e.g. 3) on the message/options object passed to
amqpPublish, and pass message properties (contentType, messageId, persistent)
through the amqpPublish call so semantics stay the same; return the same { ok: {
success: true } } after the publish.
| const data = { payload: pl, id, options } | ||
| channel?.publish(EXCHANGE, routingKey, Buffer.from(JSON.stringify(data)), { | ||
| contentType: 'application/json', | ||
| messageId: id, | ||
| persistent: true, | ||
| }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same: use amqpPublish for message sends and set priority via options (AMQP property), not inside payload.
- const data = { payload: pl, id, options }
- channel?.publish(EXCHANGE, routingKey, Buffer.from(JSON.stringify(data)), {
- contentType: 'application/json',
- messageId: id,
- persistent: true,
- })
+ const data = { payload: pl, id, options }
+ await amqpPublish(EXCHANGE, 'outgoing.baileys', routingKey, data, {
+ type: 'topic',
+ priority: (options as any).priority || 5,
+ })📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const data = { payload: pl, id, options } | |
| channel?.publish(EXCHANGE, routingKey, Buffer.from(JSON.stringify(data)), { | |
| contentType: 'application/json', | |
| messageId: id, | |
| persistent: true, | |
| }) | |
| const data = { payload: pl, id, options } | |
| await amqpPublish(EXCHANGE, 'outgoing.baileys', routingKey, data, { | |
| type: 'topic', | |
| priority: (options as any).priority || 5, | |
| }) |
🤖 Prompt for AI Agents
In src/services/incoming_amqp.ts around lines 61 to 66, the code is directly
using channel.publish and embedding options/priority inside the JSON payload;
change this to call the project's amqpPublish helper and pass message properties
(messageId, contentType, persistent, and priority from the options object) as
AMQP properties rather than inside the payload, and ensure the published payload
only contains the actual payload and id (omit options/priority from the
serialized data).
src/services/redis.ts
Outdated
| export const clearConnectCount = async (phone: string) => { | ||
| const keyPattern = connectCountKey(phone, '*') | ||
| const keys = await redisKeys(keyPattern) | ||
| for (let index = 0; index < keys.length.length; index++) { | ||
| const key = keys[index]; | ||
| const key = keys[index] | ||
| await redisDel(key) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix the loop bound so connect counts are actually cleared.
keys.length.length always evaluates to undefined, so the loop never runs and stale connect-count keys are left in Redis. This breaks the retry throttle logic. Please iterate up to keys.length instead.
- for (let index = 0; index < keys.length.length; index++) {
+ for (let index = 0; index < keys.length; index++) {📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| export const clearConnectCount = async (phone: string) => { | |
| const keyPattern = connectCountKey(phone, '*') | |
| const keys = await redisKeys(keyPattern) | |
| for (let index = 0; index < keys.length.length; index++) { | |
| const key = keys[index]; | |
| const key = keys[index] | |
| await redisDel(key) | |
| } | |
| export const clearConnectCount = async (phone: string) => { | |
| const keyPattern = connectCountKey(phone, '*') | |
| const keys = await redisKeys(keyPattern) | |
| for (let index = 0; index < keys.length; index++) { | |
| const key = keys[index] | |
| await redisDel(key) | |
| } | |
| } |
🤖 Prompt for AI Agents
In src/services/redis.ts around lines 407 to 413, the for-loop uses
keys.length.length which is undefined so the loop never runs; change the loop
bound to keys.length (or replace with a simple for..of over keys) so each key is
iterated and redisDel is called to actually clear connect-count keys from Redis.
| setInterval(() => { | ||
| dataStore.writeToFile(dataFile), 10_0000 | ||
| ;(dataStore.writeToFile(dataFile), 10_0000) | ||
| }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Restore the setInterval delay to avoid a tight write loop.
Moving the 10_0000 literal inside the callback removes the second argument from setInterval, so the timer now defaults to 0 ms and continuously hammers writeToFile. That will peg the event loop, thrash the disk, and likely corrupt the data store under load. Please reintroduce the delay argument (e.g., setInterval(() => dataStore.writeToFile(dataFile), 10_000)), and drop the comma expression inside the callback.
🤖 Prompt for AI Agents
In src/services/store_file.ts around lines 83 to 85, the setInterval call
mistakenly places the numeric delay inside the callback as a comma-expression,
so setInterval is invoked without a delay (default 0ms) and will spin; change
the call to pass the delay as the second argument and drop the comma-expression
so the callback simply calls dataStore.writeToFile(dataFile) and setInterval
receives 10_000 (or 10000) as the delay.
| channel?.consume('outgoing.baileys', async (payload) => { | ||
| if (!payload) { | ||
| return | ||
| } | ||
| const phone = extractRoutingKeyFromBindingKey(payload.fields.routingKey) | ||
| const data = JSON.parse(payload.content.toString()) | ||
| try { | ||
| await providerJob.consume(phone, data) | ||
| } catch (error) { | ||
| logger.error(error, 'Error consuming provider.baileys message') | ||
| } | ||
| channel.ack(payload) | ||
| }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Avoid ACKing provider messages when processing fails
Right now we ACK every outgoing.baileys delivery even if providerJob.consume throws. That drops the message permanently, so any transient failure (client offline, network blip, adapter hiccup) results in data loss with zero retry. We need to ACK only after a successful send and requeue (or route to DLQ) on failures so we preserve reliability.
- channel?.consume('outgoing.baileys', async (payload) => {
+ channel?.consume('outgoing.baileys', async (payload) => {
if (!payload) {
return
}
const phone = extractRoutingKeyFromBindingKey(payload.fields.routingKey)
const data = JSON.parse(payload.content.toString())
try {
await providerJob.consume(phone, data)
- } catch (error) {
- logger.error(error, 'Error consuming provider.baileys message')
- }
- channel.ack(payload)
+ channel.ack(payload)
+ } catch (error) {
+ logger.error(error, 'Error consuming provider.baileys message')
+ channel.nack(payload, false, true) // requeue so the message isn’t lost
+ }
})📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| channel?.consume('outgoing.baileys', async (payload) => { | |
| if (!payload) { | |
| return | |
| } | |
| const phone = extractRoutingKeyFromBindingKey(payload.fields.routingKey) | |
| const data = JSON.parse(payload.content.toString()) | |
| try { | |
| await providerJob.consume(phone, data) | |
| } catch (error) { | |
| logger.error(error, 'Error consuming provider.baileys message') | |
| } | |
| channel.ack(payload) | |
| }) | |
| channel?.consume('outgoing.baileys', async (payload) => { | |
| if (!payload) { | |
| return | |
| } | |
| const phone = extractRoutingKeyFromBindingKey(payload.fields.routingKey) | |
| const data = JSON.parse(payload.content.toString()) | |
| try { | |
| await providerJob.consume(phone, data) | |
| channel.ack(payload) | |
| } catch (error) { | |
| logger.error(error, 'Error consuming provider.baileys message') | |
| channel.nack(payload, false, true) // requeue so the message isn’t lost | |
| } | |
| }) |
🤖 Prompt for AI Agents
In src/standalone.ts around lines 176-188: the code currently ack()s every
'outgoing.baileys' message regardless of providerJob.consume outcome; change it
so the message is acked only after providerJob.consume completes successfully,
and on error do not ack — instead call channel.nack(payload, false, true) to
requeue transient failures (or channel.reject with requeue=true) and implement a
max-retry/Dead Letter routing based on a retry header (increment a retry count
and route to DLQ when exceeded). Also ensure errors are logged with context and
that ack/nack is called exactly once per delivery.
…vider This commit updates the codebase to align with version v3.0.0 and introduces support for a new WhatsMeow provider. A compatible adapter is required and is available at: https://github.com/mbap-dev/provider-whatsmeow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
♻️ Duplicate comments (8)
src/defaults.ts (1)
37-37: Typo in constant name (flagged in previous review).The constant name
WHATSOMEOW_ADAPTER_BASEURLcontains a typo ("WHATSOMEOW" instead of "WHATSMEOW"). This was already flagged in a previous review but remains unchanged.src/jobs/outgoing.ts (2)
172-181: Fix mediaKey normalization for Buffer JSON objects.When
media.media_keyarrives as a serialized Buffer (e.g.,{ type: 'Buffer', data: [...] }),Object.values(mediaKey)produces an incorrect array[type, data]instead of the byte values. This was flagged in a previous review but remains unchanged.Apply the suggested fix from the previous review:
} else if (typeof mediaKey === 'object' && mediaKey !== null) { - try { - mediaKey = Uint8Array.from(Object.values(mediaKey)) - } catch (_) { /* ignore conversion errors */ } + const candidate = + Array.isArray(mediaKey) ? mediaKey : + Array.isArray(mediaKey.data) ? mediaKey.data : + undefined + if (candidate) { + mediaKey = Uint8Array.from(candidate) + } else { + return message + }
204-204: Ensure mimetype always has a valid fallback.
mime.lookup(media.filename)returnsfalsefor unknown extensions, which can break downstream code expecting a string. This was flagged in a previous review but remains unchanged.Apply the suggested fix:
-const mimetype: string = media.mime_type || media.mimetype || (media.filename ? (mime.lookup(media.filename) as string) : 'application/octet-stream') +const inferred = media.filename ? mime.lookup(media.filename) : undefined +const mimetype: string = media.mime_type || media.mimetype || inferred || 'application/octet-stream'src/broker.ts (1)
148-173: Use amqpConsume for provider queue (flagged in previous review).The direct
channel.consumecall bypasses the project's retry/backoff, DLQ routing, timeout handling, and prefetch logic. This was flagged as a critical issue in a previous review but remains unchanged.Apply the suggested fix from the previous review to use
amqpConsume:-const channel = await amqpGetChannel() -await channel?.assertExchange('unoapi.outgoing', 'topic', { durable: true }) -await channel?.assertQueue('outgoing.baileys', { durable: true }) -await channel?.bindQueue('outgoing.baileys', 'unoapi.outgoing', 'provider.baileys.*') -// Ensure Whatsmeow queues exist too (created but not consumed here) -await channel?.assertQueue('outgoing.baileys.dlq', { durable: true }) -await channel?.assertQueue('outgoing.whatsmeow', { durable: true }) -await channel?.bindQueue('outgoing.whatsmeow', 'unoapi.outgoing', 'provider.whatsmeow.*') -await channel?.assertQueue('outgoing.whatsmeow.dlq', { durable: true }) const listenerAmqpWorker = new ListenerAmqp() const onNewLogin = onNewLoginGenerateToken(outgoingCloudApi) const incomingBaileysWorker = new IncomingBaileys(listenerAmqpWorker, getConfigRedis, getClientBaileys, onNewLogin) const providerJob = new IncomingJob(incomingBaileysWorker, outgoingAmqp, getConfigRedis) -channel?.consume('outgoing.baileys', async (payload) => { - if (!payload) { - return - } - const phone = extractRoutingKeyFromBindingKey(payload.fields.routingKey) - const data = JSON.parse(payload.content.toString()) - try { - await providerJob.consume(phone, data) - } catch (error) { - logger.error(error, 'Error consuming provider.baileys message') - } - channel.ack(payload) -}) +await amqpConsume('unoapi.outgoing', 'outgoing.baileys', 'provider.baileys.*', providerJob.consume.bind(providerJob), { + type: 'topic', + prefetch, + notifyFailedMessages, +})src/services/incoming_amqp.ts (3)
2-2: ImportamqpPublishto leverage existing retry/DLQ infrastructure.The previous review comment requesting this change has not been addressed. The code still uses direct
channel?.publishcalls instead of the project'samqpPublishhelper, which means messages bypass retry logic, DLQ routing, and priority handling.Apply this diff to add the missing import:
-import { amqpGetChannel } from '../amqp' +import { amqpGetChannel, amqpPublish } from '../amqp'
53-64: UseamqpPublishfor reliable message delivery with retry/DLQ support.The previous review comment requesting this change has not been addressed. Direct
channel?.publishbypasses:
- Automatic retry on transient failures
- Dead-letter queue routing after max retries
- Consistent message property handling
Additionally, setting
options['type'] = 'direct'has no effect since the exchange type istopic.Apply this diff:
- const channel = await amqpGetChannel() const routingKey = `provider.${provider}.${phone}` if (status) { - options['type'] = 'direct' - options['priority'] = 3 // update status is always middle important const data = { payload: pl, options } - channel?.publish(EXCHANGE, routingKey, Buffer.from(JSON.stringify(data)), { - contentType: 'application/json', - messageId: (payload as any).message_id, - persistent: true, - }) + await amqpPublish(EXCHANGE, 'outgoing.baileys', routingKey, data, { + type: 'topic', + priority: 3, + }) return { ok: { success: true } }
70-75: UseamqpPublishfor reliable message delivery with retry/DLQ support.The previous review comment requesting this change has not been addressed. Direct
channel?.publishbypasses the project's retry, DLQ, and priority-handling infrastructure.Apply this diff:
const data = { payload: pl, id, options } - channel?.publish(EXCHANGE, routingKey, Buffer.from(JSON.stringify(data)), { - contentType: 'application/json', - messageId: id, - persistent: true, - }) + await amqpPublish(EXCHANGE, 'outgoing.baileys', routingKey, data, { + type: 'topic', + priority: (options as any).priority || 5, + })src/standalone.ts (1)
196-208: Avoid ACKing messages when processing fails.The previous review comment requesting this change has not been addressed. Unconditional ACK on line 207 means failed messages are permanently lost with no retry, even for transient failures (network blips, client offline, adapter unavailable). This degrades reliability guarantees.
Apply this diff to enable retries:
channel?.consume('outgoing.baileys', async (payload) => { if (!payload) { return } const phone = extractRoutingKeyFromBindingKey(payload.fields.routingKey) const data = JSON.parse(payload.content.toString()) try { await providerJob.consume(phone, data) + channel.ack(payload) } catch (error) { logger.error(error, 'Error consuming provider.baileys message') + channel.nack(payload, false, true) // requeue for retry } - channel.ack(payload) })For production robustness, also implement max-retry limiting with DLQ routing (track retry count in message headers and route to DLQ after exceeding the limit).
🧹 Nitpick comments (4)
src/services/media_store_file.ts (1)
212-212: Consider explicit type handling formime.lookup().The
mime.lookup()method returnsstring | false, which meansmimeTypecould befalse. While the conditional spread at line 215 handles this correctly at runtime, TypeScript strict mode may raise type errors if the payload'smime_typefield expects onlystring.Consider making the type handling explicit:
- const mimeType = mediaPayload.mime_type || mime.lookup(filePath) + const lookedUpType = mime.lookup(filePath) + const mimeType = mediaPayload.mime_type || (lookedUpType || undefined)This ensures
mimeTypeisstring | undefinedrather thanstring | false, which may be clearer and more type-safe.src/amqp.ts (1)
119-119: Consider the implications of unlimited event listeners.Setting
setMaxListeners(0)removes the default Node.js limit (10) on event listeners, which can mask memory leaks if listeners accumulate over time. If you're adding many listeners dynamically (e.g., per-session bindings), ensure they're properly cleaned up on disconnect.If unbounded listeners are truly needed, document why in a comment:
+ // Allow unlimited listeners for dynamic per-session AMQP bindings amqpChannel?.setMaxListeners(0)src/jobs/outgoing.ts (2)
233-235: Consider making the delay conditional or removing it.The 1-second
sleep(1000)now applies to all webhook dispatches (messages, statuses, media) after WhatsMeow processing. While reduced from 2 seconds (flagged in a previous review), this still impacts throughput. Consider:
- Apply delay only for status/receipt updates:
-// If this is a status/receipt payload, optionally delay to ensure -// related media messages are delivered first to the webhook consumer. -await sleep(1000) +if (isUpdateMessage(payload)) { + await sleep(1000) +}
- Or rely on the existing
delayFuncandUNOAPI_DELAY_AFTER_FIRST_MESSAGE_WEBHOOK_MSfor ordering guarantees, and remove this sleep entirely.
225-227: Minor: from normalization logic could be clearer.The check
!/^\d+$/.test(message.from)prevents normalization for digits-only phone numbers (to avoid BR-specific '9' insertion). The comment explains this, but the double-negative logic is subtle.Consider a positive assertion:
-// Preserve digits-only Cloud payloads from adapters (e.g., whatsmeow) -// to avoid BR-specific normalization that inserts a '9' and -// breaks alignment with contacts[0].wa_id used by Chatwoot. -if (!/^\d+$/.test(message.from)) { +// Only normalize non-numeric IDs (JIDs with '@' etc.) +// to avoid BR-specific '9' insertion on plain phone numbers +const isPlainPhoneNumber = /^\d+$/.test(message.from) +if (!isPlainPhoneNumber) { message.from = jidToPhoneNumber(message.from, '') }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (17)
.gitignore(1 hunks)public/index.html(9 hunks)src/amqp.ts(2 hunks)src/broker.ts(3 hunks)src/controllers/phone_number_controller.ts(2 hunks)src/controllers/session_command_controller.ts(1 hunks)src/defaults.ts(1 hunks)src/jobs/bind_bridge.ts(3 hunks)src/jobs/incoming_whatsmeow.ts(1 hunks)src/jobs/outgoing.ts(3 hunks)src/router.ts(3 hunks)src/services/auto_connect.ts(2 hunks)src/services/client_baileys.ts(1 hunks)src/services/config.ts(1 hunks)src/services/incoming_amqp.ts(3 hunks)src/services/media_store_file.ts(2 hunks)src/standalone.ts(3 hunks)
🚧 Files skipped from review as they are similar to previous changes (6)
- src/controllers/phone_number_controller.ts
- src/services/auto_connect.ts
- src/router.ts
- src/controllers/session_command_controller.ts
- src/services/config.ts
- .gitignore
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2024-11-27T11:42:53.193Z
Learnt from: PP-Contrib
PR: clairton/unoapi-cloud#91
File: src/jobs/webhooker.ts:65-67
Timestamp: 2024-11-27T11:42:53.193Z
Learning: In `src/jobs/webhooker.ts`, error handling for single webhook HTTP requests in the `WebhookerJob` class is managed by RabbitMQ, so additional error handling in the `consume` method is unnecessary.
Applied to files:
src/broker.ts
🧬 Code graph analysis (7)
src/jobs/incoming_whatsmeow.ts (2)
src/amqp.ts (1)
amqpPublish(211-262)src/defaults.ts (2)
UNOAPI_EXCHANGE_BROKER_NAME(78-78)UNOAPI_QUEUE_OUTGOING(91-91)
src/amqp.ts (3)
src/defaults.ts (1)
UNOAPI_EXCHANGE_BRIDGE_NAME(79-79)src/services/listener_amqp.ts (2)
_(33-33)ListenerAmqp(39-53)src/waker.ts (1)
Promise(42-69)
src/broker.ts (6)
src/amqp.ts (2)
amqpGetChannel(114-123)extractRoutingKeyFromBindingKey(56-59)src/services/listener_amqp.ts (1)
ListenerAmqp(39-53)src/services/on_new_login_generate_token.ts (1)
onNewLoginGenerateToken(8-33)src/services/incoming_baileys.ts (2)
IncomingBaileys(8-31)constructor(14-19)src/services/client_baileys.ts (1)
getClientBaileys(42-76)src/jobs/incoming.ts (1)
IncomingJob(12-189)
src/jobs/outgoing.ts (4)
src/amqp.ts (1)
amqpPublish(211-262)src/defaults.ts (2)
UNOAPI_EXCHANGE_BROKER_NAME(78-78)UNOAPI_QUEUE_OUTGOING(91-91)src/services/store.ts (1)
store(21-23)src/services/transformer.ts (3)
isUpdateMessage(483-486)TYPE_MESSAGES_MEDIA(12-12)jidToPhoneNumber(522-539)
src/services/incoming_amqp.ts (3)
src/amqp.ts (1)
amqpGetChannel(114-123)src/services/transformer.ts (2)
getGroupId(439-454)jidToPhoneNumber(522-539)src/utils/id.ts (1)
generateUnoId(13-13)
src/standalone.ts (4)
src/amqp.ts (3)
amqpConsume(264-359)amqpGetChannel(114-123)extractRoutingKeyFromBindingKey(56-59)src/services/incoming_baileys.ts (1)
IncomingBaileys(8-31)src/services/client_baileys.ts (1)
getClientBaileys(42-76)src/jobs/incoming.ts (1)
IncomingJob(12-189)
src/jobs/bind_bridge.ts (3)
src/amqp.ts (1)
amqpConsume(264-359)src/defaults.ts (4)
UNOAPI_EXCHANGE_BRIDGE_NAME(79-79)UNOAPI_QUEUE_LISTENER(86-86)UNOAPI_SERVER_NAME(66-66)UNOAPI_QUEUE_INCOMING(102-102)src/jobs/incoming_whatsmeow.ts (1)
IncomingWhatsmeow(5-16)
🔇 Additional comments (16)
src/services/media_store_file.ts (3)
5-5: LGTM!The import of the
pathmodule is appropriate for the new filename extension parsing logic.
191-193: LGTM!The defensive early return prevents potential null pointer errors downstream and provides clear logging for debugging.
215-215: LGTM!The conditional spread correctly handles cases where
mimeTypeis falsy, preventing undefined or false values from being added to the payload.src/amqp.ts (1)
344-348: LGTM: Plain routing key binding for external adapters.The conditional binding for the bridge exchange allows WhatsMeow and other external adapters to publish using just the phone number as the routing key (without the queue prefix). The comment clearly explains the intent, and the logic correctly applies only to the direct exchange.
src/services/client_baileys.ts (1)
370-381: LGTM: Call rejection message properly integrated.After rejecting the call, the code now sends a rejection message and dispatches it through the listener with type 'append'. The message structure correctly uses the response key id and maintains consistency with the broader message flow.
src/jobs/incoming_whatsmeow.ts (1)
5-16: LGTM: Clean bridge implementation for WhatsMeow.The
IncomingWhatsmeowclass provides a straightforward bridge that normalizes payloads from the WhatsMeow adapter and forwards them to the standard outgoing flow. The implementation correctly:
- Handles both
data.payloadand plaindataformats- Publishes to the broker exchange with topic routing
- Includes clear comments explaining the flow
src/jobs/bind_bridge.ts (2)
44-44: LGTM: Provider validation extended for WhatsMeow.The provider check now correctly includes 'whatsmeow' alongside 'forwarder' and 'baileys', allowing WhatsMeow sessions to be processed by the bridge binding flow.
62-107: LGTM: Provider-specific consumer initialization.The conditional logic correctly handles different provider types:
- Baileys/forwarder: initializes both listener and incoming consumers for the standard flow
- WhatsMeow: initializes only the IncomingWhatsmeow bridge consumer to forward to outgoing
The separation is appropriate since WhatsMeow uses an adapter-based model where the adapter handles the WhatsApp protocol, and UnoAPI just needs to bridge messages to webhooks.
src/jobs/outgoing.ts (1)
66-78: LGTM: Webhook fan-out for adapter-published payloads.When adapters publish only
{ payload }without specifying webhooks, this code correctly re-enqueues one message per configured webhook. This allows the provider-specific transforms in the single-webhook path below to be reused.public/index.html (5)
247-256: LGTM: Provider selection integrated into session form.The provider dropdown allows users to select between baileys, whatsmeow, and forwarder. The field is properly wired into form population (line 840) and submission (line 934), ensuring provider choice persists across edits.
523-524: LGTM: WhatsMeow connect flow with proper cleanup.The implementation correctly:
- Clears old polling intervals before starting new ones (lines 1119-1120)
- Polls the QR endpoint every 1.5 seconds to update the image (lines 1133-1148)
- Polls the session status every 2 seconds to detect connection (lines 1151-1168)
- Stops polling when connected (lines 1158-1159)
- Cleans up intervals when the modal is closed (lines 1175-1176)
This provides a smooth user experience for WhatsMeow connections without relying on websockets.
Also applies to: 1117-1177
728-749: LGTM: Dynamic action buttons based on session status.The button set now adapts to the session status:
- Online/connecting sessions show only Edit and Disconnect
- Offline/disconnected sessions show Edit, Connect, Delete, and Test
This provides a clearer UX by hiding irrelevant actions.
787-805: LGTM: Disconnect button implementation.The disconnect handler:
- Posts to the new
/sessions/${number}/disconnectendpoint- Provides user feedback via alert
- Refreshes the session list on success
This complements the WhatsMeow provider flow where sessions can be managed via API.
1018-1032: LGTM: Provider-aware connect routing.The connect button logic correctly routes sessions based on provider:
- WhatsMeow sessions trigger the polling-based flow
- Baileys/other sessions use the existing websocket-based flow
This ensures the appropriate connection method is used for each provider type.
src/services/incoming_amqp.ts (1)
32-48: LGTM: Payload normalization improves provider compatibility.The logic correctly:
- Normalizes Graph API reply context by ensuring both
context.idandcontext.message_idare present- Provides a sensible fallback for empty
tofields by extracting the group ID from the payloadThe shallow copy via spread operator is appropriate here since you're only adding/normalizing top-level and context fields without deeply mutating nested structures.
src/standalone.ts (1)
64-64: LGTM: Import additions and formatting are correct.The new imports (
amqpGetChannel,extractRoutingKeyFromBindingKey,IncomingJob) align with the provider-specific outgoing setup and are used appropriately. The single-line formatting of theamqpConsumecall is a minor style improvement.Also applies to: 75-75, 181-181
| const initExchange = async () => { | ||
| if (initialized) { | ||
| return | ||
| } | ||
| const channel = await amqpGetChannel() | ||
| await channel?.assertExchange(EXCHANGE, 'topic', { durable: true }) | ||
| await channel?.assertQueue('outgoing.baileys', { durable: true }) | ||
| await channel?.bindQueue('outgoing.baileys', EXCHANGE, 'provider.baileys.*') | ||
| await channel?.assertQueue('outgoing.baileys.dlq', { durable: true }) | ||
| // Do not declare or bind whatsmeow queues here; adapter owns its queue/bindings | ||
| initialized = true | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Configure dead-letter exchange for the DLQ and eliminate duplicate queue declarations.
Two issues:
-
Missing DLQ configuration: The
outgoing.baileys.dlqqueue is declared but lacks dead-letter-exchange (DLX) configuration. Without a DLX, messages routed to the DLQ have no path forward (no retry, no notification, no consumption). -
Code duplication: These same queue/binding declarations are repeated in
src/standalone.ts(lines 186-193), violating DRY. The comment on line 19 states "adapter owns its queue/bindings" for Whatsmeow, yetstandalone.tsdeclares those queues anyway.
Apply this diff to configure the DLQ properly:
- await channel?.assertQueue('outgoing.baileys.dlq', { durable: true })
+ await channel?.assertQueue('outgoing.baileys.dlq', {
+ durable: true,
+ deadLetterExchange: EXCHANGE,
+ deadLetterRoutingKey: 'dlq.notification'
+ })Consider extracting this initialization logic into src/amqp.ts to eliminate duplication between this file and src/standalone.ts.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const initExchange = async () => { | |
| if (initialized) { | |
| return | |
| } | |
| const channel = await amqpGetChannel() | |
| await channel?.assertExchange(EXCHANGE, 'topic', { durable: true }) | |
| await channel?.assertQueue('outgoing.baileys', { durable: true }) | |
| await channel?.bindQueue('outgoing.baileys', EXCHANGE, 'provider.baileys.*') | |
| await channel?.assertQueue('outgoing.baileys.dlq', { durable: true }) | |
| // Do not declare or bind whatsmeow queues here; adapter owns its queue/bindings | |
| initialized = true | |
| } | |
| const initExchange = async () => { | |
| if (initialized) { | |
| return | |
| } | |
| const channel = await amqpGetChannel() | |
| await channel?.assertExchange(EXCHANGE, 'topic', { durable: true }) | |
| await channel?.assertQueue('outgoing.baileys', { durable: true }) | |
| await channel?.bindQueue('outgoing.baileys', EXCHANGE, 'provider.baileys.*') | |
| await channel?.assertQueue('outgoing.baileys.dlq', { | |
| durable: true, | |
| deadLetterExchange: EXCHANGE, | |
| deadLetterRoutingKey: 'dlq.notification' | |
| }) | |
| // Do not declare or bind whatsmeow queues here; adapter owns its queue/bindings | |
| initialized = true | |
| } |
🤖 Prompt for AI Agents
In src/services/incoming_amqp.ts around lines 10-21, the outgoing.baileys.dlq
queue is declared without DLX configuration and the same queue/binding logic is
duplicated in src/standalone.ts; fix by extracting the exchange/queue
initialization into a shared src/amqp.ts function (e.g., initOutgoingExchange)
and call it from both places, remove the duplicated declarations in
standalone.ts, and in this file declare the DLQ with proper dead-letter settings
by asserting the DLX (assertExchange for a DLX, durable) and creating
outgoing.baileys.dlq with queue args: set 'x-dead-letter-exchange' to the main
exchange (or a retry exchange as desired) and an appropriate
'x-dead-letter-routing-key' so messages can be routed back/retried, keep queues
durable, and retain the comment that adapter-owned queues (Whatsmeow) should not
be declared here.
| let filePath: string | undefined | ||
| if (mediaPayload.mime_type) { | ||
| filePath = mediaStore.getFilePath(phone, mediaId!, mediaPayload.mime_type) | ||
| } else if (mediaPayload.filename) { | ||
| logger.debug('Missing mime type for media %s, deriving from filename', mediaId) | ||
| const ext = path.extname(mediaPayload.filename) | ||
| if (ext) { | ||
| filePath = `${phone}/${mediaId}${ext}` | ||
| } | ||
| } | ||
|
|
||
| if (!filePath) { | ||
| logger.warn('Media payload missing mime type and filename %s', mediaId) | ||
| return undefined | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Validate phone and mediaId to prevent path traversal.
The filePath construction at line 202 uses phone and mediaId directly without validation. If these contain path traversal sequences (e.g., ../, absolute paths), malicious actors could access or overwrite files outside the intended directory.
Consider adding validation before constructing the path:
+ // Validate phone and mediaId to prevent path traversal
+ if (phone.includes('..') || phone.includes('/') || mediaId.includes('..') || mediaId.includes('/')) {
+ logger.warn('Invalid phone or mediaId for media %s', mediaId)
+ return undefined
+ }
+
let filePath: string | undefined
if (mediaPayload.mime_type) {Alternatively, use a path sanitization library or normalize the path after construction to ensure it stays within the intended directory.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let filePath: string | undefined | |
| if (mediaPayload.mime_type) { | |
| filePath = mediaStore.getFilePath(phone, mediaId!, mediaPayload.mime_type) | |
| } else if (mediaPayload.filename) { | |
| logger.debug('Missing mime type for media %s, deriving from filename', mediaId) | |
| const ext = path.extname(mediaPayload.filename) | |
| if (ext) { | |
| filePath = `${phone}/${mediaId}${ext}` | |
| } | |
| } | |
| if (!filePath) { | |
| logger.warn('Media payload missing mime type and filename %s', mediaId) | |
| return undefined | |
| } | |
| // Validate phone and mediaId to prevent path traversal | |
| if (phone.includes('..') || phone.includes('/') || mediaId.includes('..') || mediaId.includes('/')) { | |
| logger.warn('Invalid phone or mediaId for media %s', mediaId) | |
| return undefined | |
| } | |
| let filePath: string | undefined | |
| if (mediaPayload.mime_type) { | |
| filePath = mediaStore.getFilePath(phone, mediaId!, mediaPayload.mime_type) | |
| } else if (mediaPayload.filename) { | |
| logger.debug('Missing mime type for media %s, deriving from filename', mediaId) | |
| const ext = path.extname(mediaPayload.filename) | |
| if (ext) { | |
| filePath = `${phone}/${mediaId}${ext}` | |
| } | |
| } | |
| if (!filePath) { | |
| logger.warn('Media payload missing mime type and filename %s', mediaId) | |
| return undefined | |
| } |
🤖 Prompt for AI Agents
In src/services/media_store_file.ts around lines 195 to 209, the code constructs
filePath using phone and mediaId directly which allows path traversal or
absolute path injection; validate and sanitize both values before constructing
paths by (1) rejecting or normalizing any input that contains path separators or
traversal sequences (../ or ..\) or leading separators, (2) enforcing a strict
whitelist (e.g., alphanumeric, hyphen, underscore) or explicit length limits for
phone and mediaId, and (3) after building the candidate path, use path.join
against the intended base directory and path.normalize and verify the normalized
path starts with the base directory to ensure it cannot escape; if validation
fails, log a warning and return undefined.
| await channel?.assertExchange('unoapi.outgoing', 'topic', { durable: true }) | ||
| await channel?.assertQueue('outgoing.baileys', { durable: true }) | ||
| await channel?.bindQueue('outgoing.baileys', 'unoapi.outgoing', 'provider.baileys.*') | ||
| // Ensure Whatsmeow queues exist too (created but not consumed here) | ||
| await channel?.assertQueue('outgoing.baileys.dlq', { durable: true }) | ||
| await channel?.assertQueue('outgoing.whatsmeow', { durable: true }) | ||
| await channel?.bindQueue('outgoing.whatsmeow', 'unoapi.outgoing', 'provider.whatsmeow.*') | ||
| await channel?.assertQueue('outgoing.whatsmeow.dlq', { durable: true }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Eliminate duplicate queue declarations.
These queue/binding declarations are duplicated in src/services/incoming_amqp.ts (lines 10-21), violating DRY. Additionally, line 19 of incoming_amqp.ts states "adapter owns its queue/bindings" for Whatsmeow, yet this code declares outgoing.whatsmeow and its DLQ anyway.
Consider consolidating all queue/exchange declarations into a single initialization function in src/amqp.ts and calling it once during broker startup. This would:
- Eliminate duplication
- Clarify ownership boundaries (which service declares which queues)
- Make the infrastructure setup more maintainable
Description
This PR updates the project to version v3.0.0 and introduces compatibility with the new WhatsMeow provider.
Key Changes
v3.0.0specifications and breaking changes.Adapter Requirement
A compatible adapter is required to use this new provider.
You can find it here: mbap-dev/provider-whatsmeow
Testing & Notes
v3.0.0changes apply.Summary by CodeRabbit
New Features
Bug Fixes