From 47bb175a4f91623d9d566dc96da6ce7a6629a519 Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Sat, 9 May 2026 13:37:53 +0000 Subject: [PATCH] n8n: real-time training loop + decoupled posting MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit instagram-approval: after every tap, immediately fetch /candidates?limit=1 and send the next photo as a fresh inline-keyboard message — the user's tap chains back into this same workflow, so the loop is user-paced. When the pool is exhausted, send an 'all caught up' summary with the backlog count + cumulative training stats. instagram-discover: cron throttled from every-30-min to daily 09:00. The chain handles ongoing training; the daily run only kickstarts a session if the user hasn't been tapping. Limit reduced from 3 → 1 so each kickstart sends a single photo (chain takes over). --- stacks/n8n/workflows/instagram-approval.json | 461 ++++++++----------- stacks/n8n/workflows/instagram-discover.json | 15 +- 2 files changed, 196 insertions(+), 280 deletions(-) diff --git a/stacks/n8n/workflows/instagram-approval.json b/stacks/n8n/workflows/instagram-approval.json index 5533180c..72d40cfc 100644 --- a/stacks/n8n/workflows/instagram-approval.json +++ b/stacks/n8n/workflows/instagram-approval.json @@ -15,26 +15,20 @@ "name": "Telegram Webhook", "type": "n8n-nodes-base.webhook", "typeVersion": 2, - "position": [ - 250, - 400 - ], + "position": [250, 400], "webhookId": "f2c7c254-ebaf-4f66-b1b4-5c1629c07e08", - "notes": "Listens for inline-button taps. Requires a Telegram credential bound to the same bot whose token is in TELEGRAM_BOT_TOKEN." + "notes": "Receives Telegram inline-button taps." }, { "parameters": { - "jsCode": "// Webhook puts Telegram update under .body; Telegram trigger puts it at root\nconst raw = $input.first().json;\nconst update = raw.body || raw;\nconst cb = update.callback_query || {};\nconst data = cb.data || '';\nconst [action, assetId] = data.split(':');\nconst message = cb.message || {};\nconst chatId = (message.chat || {}).id;\nconst messageId = message.message_id;\nconst originalCaption = message.caption || '';\nconst callbackQueryId = cb.id;\nreturn [{\n json: {\n action,\n asset_id: assetId,\n chat_id: chatId,\n message_id: messageId,\n original_caption: originalCaption,\n callback_query_id: callbackQueryId,\n }\n}];" + "jsCode": "const raw = $input.first().json;\nconst update = raw.body || raw;\nconst cb = update.callback_query || {};\nconst data = cb.data || '';\nconst [action, assetId] = data.split(':');\nconst message = cb.message || {};\nconst chatId = (message.chat || {}).id;\nconst messageId = message.message_id;\nconst originalCaption = message.caption || '';\nconst callbackQueryId = cb.id;\nreturn [{\n json: {\n action,\n asset_id: assetId,\n chat_id: chatId,\n message_id: messageId,\n original_caption: originalCaption,\n callback_query_id: callbackQueryId,\n }\n}];" }, "id": "parse-callback", "name": "Parse callback_data", "type": "n8n-nodes-base.code", "typeVersion": 2, - "position": [ - 470, - 400 - ], - "notes": "Splits callback_data into action + asset_id, captures chat_id/message_id/caption for later edits and answerCallbackQuery." + "position": [470, 400], + "notes": "Splits callback_data into action + asset_id." }, { "parameters": { @@ -42,44 +36,26 @@ "values": [ { "conditions": { - "options": { - "caseSensitive": true, - "leftValue": "", - "typeValidation": "strict" - }, - "conditions": [ - { - "id": "is-approve", - "leftValue": "={{ $json.action }}", - "rightValue": "approve", - "operator": { - "type": "string", - "operation": "equals" - } - } - ], + "options": {"caseSensitive": true, "leftValue": "", "typeValidation": "strict"}, + "conditions": [{ + "id": "is-approve", + "leftValue": "={{ $json.action }}", + "rightValue": "approve", + "operator": {"type": "string", "operation": "equals"} + }], "combinator": "and" }, "outputKey": "approve" }, { "conditions": { - "options": { - "caseSensitive": true, - "leftValue": "", - "typeValidation": "strict" - }, - "conditions": [ - { - "id": "is-reject", - "leftValue": "={{ $json.action }}", - "rightValue": "reject", - "operator": { - "type": "string", - "operation": "equals" - } - } - ], + "options": {"caseSensitive": true, "leftValue": "", "typeValidation": "strict"}, + "conditions": [{ + "id": "is-reject", + "leftValue": "={{ $json.action }}", + "rightValue": "reject", + "operator": {"type": "string", "operation": "equals"} + }], "combinator": "and" }, "outputKey": "reject" @@ -92,11 +68,8 @@ "name": "Switch on action", "type": "n8n-nodes-base.switch", "typeVersion": 3.2, - "position": [ - 690, - 400 - ], - "notes": "Branches on action; unknown actions fall through and are dropped." + "position": [690, 400], + "notes": "approve | reject branches; unknown actions dropped." }, { "parameters": { @@ -105,33 +78,21 @@ "sendHeaders": true, "headerParameters": { "parameters": [ - { - "name": "Authorization", - "value": "=Bearer {{ $env.INSTAGRAM_POSTER_TOKEN }}" - }, - { - "name": "Content-Type", - "value": "application/json" - } + {"name": "Content-Type", "value": "application/json"} ] }, "sendBody": true, "specifyBody": "json", "jsonBody": "={{ JSON.stringify({ asset_id: $json.asset_id }) }}", - "options": { - "timeout": 30000 - } + "options": {"timeout": 30000} }, "id": "approve-enqueue", - "name": "Approve: enqueue asset", + "name": "Approve: enqueue + log decision", "type": "n8n-nodes-base.httpRequest", "typeVersion": 4.2, - "position": [ - 910, - 250 - ], + "position": [910, 250], "onError": "continueErrorOutput", - "notes": "Calls instagram-poster /enqueue. continueErrorOutput so we can fall through to a Telegram error message on failure." + "notes": "Calls /enqueue → moves story_queue row to 'approved' (= backlog) AND records decision row with embedding for CLIP scoring." }, { "parameters": { @@ -140,61 +101,43 @@ "sendHeaders": true, "headerParameters": { "parameters": [ - { - "name": "Authorization", - "value": "=Bearer {{ $env.INSTAGRAM_POSTER_TOKEN }}" - }, - { - "name": "Content-Type", - "value": "application/json" - } + {"name": "Content-Type", "value": "application/json"} ] }, "sendBody": true, "specifyBody": "json", "jsonBody": "={{ JSON.stringify({ asset_id: $json.asset_id }) }}", - "options": { - "timeout": 30000 - } + "options": {"timeout": 30000} }, "id": "reject-mark", - "name": "Reject: mark seen", + "name": "Reject: mark seen + log decision", "type": "n8n-nodes-base.httpRequest", "typeVersion": 4.2, - "position": [ - 910, - 550 - ], + "position": [910, 550], "onError": "continueErrorOutput", - "notes": "Calls instagram-poster /reject so the asset is recorded and not re-surfaced." + "notes": "Calls /reject → records decision (negative training signal); doesn't add to backlog." }, { "parameters": { - "jsCode": "const upstream = $('Parse callback_data').item.json;\nconst suffix = '\\n\\nQueued for posting';\nreturn [{ json: { ...upstream, new_caption: (upstream.original_caption || '') + suffix } }];" + "jsCode": "const upstream = $('Parse callback_data').item.json;\nreturn [{ json: { ...upstream, new_caption: (upstream.original_caption || '') + '\\n\\n✅ Saved to backlog' } }];" }, "id": "approve-caption", "name": "Approve: build new caption", "type": "n8n-nodes-base.code", "typeVersion": 2, - "position": [ - 1130, - 250 - ], - "notes": "Append a confirmation suffix to the original caption." + "position": [1130, 250], + "notes": "Append confirmation." }, { "parameters": { - "jsCode": "const upstream = $('Parse callback_data').item.json;\nconst suffix = '\\n\\nRejected';\nreturn [{ json: { ...upstream, new_caption: (upstream.original_caption || '') + suffix } }];" + "jsCode": "const upstream = $('Parse callback_data').item.json;\nreturn [{ json: { ...upstream, new_caption: (upstream.original_caption || '') + '\\n\\n❌ Rejected' } }];" }, "id": "reject-caption", "name": "Reject: build new caption", "type": "n8n-nodes-base.code", "typeVersion": 2, - "position": [ - 1130, - 550 - ], - "notes": "Append rejection suffix to the original caption." + "position": [1130, 550], + "notes": "Append rejection." }, { "parameters": { @@ -203,28 +146,20 @@ "sendHeaders": true, "headerParameters": { "parameters": [ - { - "name": "Content-Type", - "value": "application/json" - } + {"name": "Content-Type", "value": "application/json"} ] }, "sendBody": true, "specifyBody": "json", "jsonBody": "={{ JSON.stringify({ chat_id: $json.chat_id, message_id: $json.message_id, caption: $json.new_caption, parse_mode: 'HTML' }) }}", - "options": { - "timeout": 30000 - } + "options": {"timeout": 30000} }, "id": "edit-caption", "name": "Telegram editMessageCaption", "type": "n8n-nodes-base.httpRequest", "typeVersion": 4.2, - "position": [ - 1350, - 400 - ], - "notes": "Updates the original DM caption to show the resulting state. Strips inline buttons in the same call by omitting reply_markup combined with the next node." + "position": [1350, 400], + "notes": "Updates the original DM caption to show the resulting state." }, { "parameters": { @@ -233,28 +168,20 @@ "sendHeaders": true, "headerParameters": { "parameters": [ - { - "name": "Content-Type", - "value": "application/json" - } + {"name": "Content-Type", "value": "application/json"} ] }, "sendBody": true, "specifyBody": "json", "jsonBody": "={{ JSON.stringify({ chat_id: $json.chat_id, message_id: $json.message_id, reply_markup: { inline_keyboard: [] } }) }}", - "options": { - "timeout": 30000 - } + "options": {"timeout": 30000} }, "id": "edit-reply-markup", "name": "Telegram editMessageReplyMarkup", "type": "n8n-nodes-base.httpRequest", "typeVersion": 4.2, - "position": [ - 1570, - 400 - ], - "notes": "Strips the inline approve/reject buttons so the original DM no longer offers them after a decision." + "position": [1570, 400], + "notes": "Strip the inline buttons from the original DM." }, { "parameters": { @@ -263,28 +190,127 @@ "sendHeaders": true, "headerParameters": { "parameters": [ - { - "name": "Content-Type", - "value": "application/json" - } + {"name": "Content-Type", "value": "application/json"} ] }, "sendBody": true, "specifyBody": "json", "jsonBody": "={{ JSON.stringify({ callback_query_id: $json.callback_query_id, text: 'Recorded' }) }}", - "options": { - "timeout": 15000 - } + "options": {"timeout": 15000} }, "id": "answer-callback", "name": "Telegram answerCallbackQuery", "type": "n8n-nodes-base.httpRequest", "typeVersion": 4.2, - "position": [ - 1790, - 400 - ], - "notes": "Dismisses the loading spinner on the user's tap." + "position": [1790, 400], + "notes": "Dismiss the spinner on the user's tap." + }, + { + "parameters": { + "method": "GET", + "url": "={{ $env.INSTAGRAM_POSTER_INTERNAL_URL }}/candidates?limit=1", + "options": {"timeout": 60000} + }, + "id": "get-next", + "name": "Get next candidate", + "type": "n8n-nodes-base.httpRequest", + "typeVersion": 4.2, + "position": [2010, 400], + "notes": "Real-time training loop: after every approve/reject, immediately fetch the next ranked candidate so the user can keep tapping. Endpoint excludes already-decided assets so no repeats." + }, + { + "parameters": { + "method": "GET", + "url": "={{ $env.INSTAGRAM_POSTER_INTERNAL_URL }}/queue?status=approved", + "options": {"timeout": 30000} + }, + "id": "backlog-count", + "name": "Get backlog count", + "type": "n8n-nodes-base.httpRequest", + "typeVersion": 4.2, + "position": [2230, 400], + "notes": "Count of approved-but-not-yet-posted rows. Shown to the user so they know how many photos are queued for posting." + }, + { + "parameters": { + "jsCode": "// Decide: send next candidate, or 'all caught up' message.\nconst nextResp = $('Get next candidate').item.json;\nconst backlog = $input.first().json;\nconst chatId = $('Parse callback_data').item.json.chat_id;\nconst candidates = (nextResp && nextResp.candidates) || [];\nconst stats = (nextResp && nextResp.stats) || {};\nconst backlogCount = Array.isArray(backlog) ? backlog.length : 0;\n\nif (candidates.length === 0) {\n return [{ json: { has_next: false, chat_id: chatId, backlog_count: backlogCount, stats } }];\n}\n\nconst c = candidates[0];\nconst score = (typeof c.score === 'number') ? c.score.toFixed(2) : '–';\nconst takenDate = c.taken_at ? c.taken_at.slice(0, 10) : '';\nconst lines = [\n '📸 Next',\n '',\n 'File: ' + (c.filename || c.asset_id),\n];\nif (takenDate) lines.push('Taken: ' + takenDate);\nlines.push('Score: ' + score + (c.has_embedding ? '' : ' (no embedding)'));\nlines.push('', 'Backlog: ' + backlogCount + ' · trained on ' + (stats.approved || 0) + '✅ / ' + (stats.rejected || 0) + '❌');\nreturn [{ json: { has_next: true, asset_id: c.asset_id, caption: lines.join('\\n'), chat_id: chatId } }];" + }, + "id": "build-next", + "name": "Build next-candidate payload", + "type": "n8n-nodes-base.code", + "typeVersion": 2, + "position": [2450, 400], + "notes": "Assemble caption with score + cumulative stats + backlog count, OR signal 'all caught up'." + }, + { + "parameters": { + "rules": { + "values": [{ + "conditions": { + "options": {"caseSensitive": true, "leftValue": "", "typeValidation": "strict"}, + "conditions": [{ + "id": "has-next", + "leftValue": "={{ $json.has_next }}", + "rightValue": true, + "operator": {"type": "boolean", "operation": "true"} + }], + "combinator": "and" + }, + "outputKey": "next" + }] + }, + "options": {"fallbackOutput": "extra"} + }, + "id": "switch-has-next", + "name": "Branch: has next?", + "type": "n8n-nodes-base.switch", + "typeVersion": 3.2, + "position": [2670, 400], + "notes": "Route to sendPhoto if there's another candidate, otherwise to 'all caught up' message." + }, + { + "parameters": { + "method": "POST", + "url": "=https://api.telegram.org/bot{{ $env.TELEGRAM_BOT_TOKEN }}/sendPhoto", + "sendHeaders": true, + "headerParameters": { + "parameters": [ + {"name": "Content-Type", "value": "application/json"} + ] + }, + "sendBody": true, + "specifyBody": "json", + "jsonBody": "={{ JSON.stringify({ chat_id: $json.chat_id, photo: $env.PUBLIC_INSTAGRAM_POSTER_URL + '/image/' + $json.asset_id, caption: $json.caption, parse_mode: 'HTML', reply_markup: { inline_keyboard: [[ { text: '✅ Approve', callback_data: 'approve:' + $json.asset_id }, { text: '❌ Reject', callback_data: 'reject:' + $json.asset_id } ]] } }) }}", + "options": {"timeout": 30000} + }, + "id": "send-next", + "name": "Telegram sendPhoto (next)", + "type": "n8n-nodes-base.httpRequest", + "typeVersion": 4.2, + "position": [2890, 250], + "notes": "Sends the next candidate with its own approve/reject buttons; tap chains back into this same workflow." + }, + { + "parameters": { + "method": "POST", + "url": "=https://api.telegram.org/bot{{ $env.TELEGRAM_BOT_TOKEN }}/sendMessage", + "sendHeaders": true, + "headerParameters": { + "parameters": [ + {"name": "Content-Type", "value": "application/json"} + ] + }, + "sendBody": true, + "specifyBody": "json", + "jsonBody": "={{ JSON.stringify({ chat_id: $json.chat_id, text: '🎉 All caught up — nothing more tagged in Immich.\\n\\nBacklog: ' + $json.backlog_count + ' approved photos waiting to post.\\nTrained on ' + (($json.stats && $json.stats.approved) || 0) + '✅ / ' + (($json.stats && $json.stats.rejected) || 0) + '❌.', parse_mode: 'HTML' }) }}", + "options": {"timeout": 15000} + }, + "id": "send-empty", + "name": "Telegram all-caught-up", + "type": "n8n-nodes-base.httpRequest", + "typeVersion": 4.2, + "position": [2890, 550], + "notes": "When no more candidates, tell the user how big the backlog is so they know how many days of content are queued." }, { "parameters": { @@ -294,11 +320,8 @@ "name": "Build error message", "type": "n8n-nodes-base.code", "typeVersion": 2, - "position": [ - 1130, - 750 - ], - "notes": "Catches non-2xx from /enqueue or /reject and formats a Telegram alert text." + "position": [1130, 750], + "notes": "Catches non-2xx from /enqueue or /reject." }, { "parameters": { @@ -307,171 +330,61 @@ "sendHeaders": true, "headerParameters": { "parameters": [ - { - "name": "Content-Type", - "value": "application/json" - } + {"name": "Content-Type", "value": "application/json"} ] }, "sendBody": true, "specifyBody": "json", "jsonBody": "={{ JSON.stringify({ chat_id: $json.chat_id, text: $json.text }) }}", - "options": { - "timeout": 15000 - } + "options": {"timeout": 15000} }, "id": "telegram-error-msg", "name": "Telegram error notice", "type": "n8n-nodes-base.httpRequest", "typeVersion": 4.2, - "position": [ - 1350, - 750 - ], - "notes": "Sends the error text to the user as a follow-up message." + "position": [1350, 750], + "notes": "Sends the error text to the user." } ], "connections": { - "Parse callback_data": { - "main": [ - [ - { - "node": "Switch on action", - "type": "main", - "index": 0 - } - ] - ] - }, + "Telegram Webhook": {"main": [[{"node": "Parse callback_data", "type": "main", "index": 0}]]}, + "Parse callback_data": {"main": [[{"node": "Switch on action", "type": "main", "index": 0}]]}, "Switch on action": { "main": [ - [ - { - "node": "Approve: enqueue asset", - "type": "main", - "index": 0 - } - ], - [ - { - "node": "Reject: mark seen", - "type": "main", - "index": 0 - } - ] + [{"node": "Approve: enqueue + log decision", "type": "main", "index": 0}], + [{"node": "Reject: mark seen + log decision", "type": "main", "index": 0}] ] }, - "Approve: enqueue asset": { + "Approve: enqueue + log decision": { "main": [ - [ - { - "node": "Approve: build new caption", - "type": "main", - "index": 0 - } - ], - [ - { - "node": "Build error message", - "type": "main", - "index": 0 - } - ] + [{"node": "Approve: build new caption", "type": "main", "index": 0}], + [{"node": "Build error message", "type": "main", "index": 0}] ] }, - "Reject: mark seen": { + "Reject: mark seen + log decision": { "main": [ - [ - { - "node": "Reject: build new caption", - "type": "main", - "index": 0 - } - ], - [ - { - "node": "Build error message", - "type": "main", - "index": 0 - } - ] + [{"node": "Reject: build new caption", "type": "main", "index": 0}], + [{"node": "Build error message", "type": "main", "index": 0}] ] }, - "Approve: build new caption": { + "Approve: build new caption": {"main": [[{"node": "Telegram editMessageCaption", "type": "main", "index": 0}]]}, + "Reject: build new caption": {"main": [[{"node": "Telegram editMessageCaption", "type": "main", "index": 0}]]}, + "Telegram editMessageCaption": {"main": [[{"node": "Telegram editMessageReplyMarkup", "type": "main", "index": 0}]]}, + "Telegram editMessageReplyMarkup": {"main": [[{"node": "Telegram answerCallbackQuery", "type": "main", "index": 0}]]}, + "Telegram answerCallbackQuery": {"main": [[{"node": "Get next candidate", "type": "main", "index": 0}]]}, + "Get next candidate": {"main": [[{"node": "Get backlog count", "type": "main", "index": 0}]]}, + "Get backlog count": {"main": [[{"node": "Build next-candidate payload", "type": "main", "index": 0}]]}, + "Build next-candidate payload": {"main": [[{"node": "Branch: has next?", "type": "main", "index": 0}]]}, + "Branch: has next?": { "main": [ - [ - { - "node": "Telegram editMessageCaption", - "type": "main", - "index": 0 - } - ] + [{"node": "Telegram sendPhoto (next)", "type": "main", "index": 0}], + [{"node": "Telegram all-caught-up", "type": "main", "index": 0}] ] }, - "Reject: build new caption": { - "main": [ - [ - { - "node": "Telegram editMessageCaption", - "type": "main", - "index": 0 - } - ] - ] - }, - "Telegram editMessageCaption": { - "main": [ - [ - { - "node": "Telegram editMessageReplyMarkup", - "type": "main", - "index": 0 - } - ] - ] - }, - "Telegram editMessageReplyMarkup": { - "main": [ - [ - { - "node": "Telegram answerCallbackQuery", - "type": "main", - "index": 0 - } - ] - ] - }, - "Build error message": { - "main": [ - [ - { - "node": "Telegram error notice", - "type": "main", - "index": 0 - } - ] - ] - }, - "Telegram Webhook": { - "main": [ - [ - { - "node": "Parse callback_data", - "type": "main", - "index": 0 - } - ] - ] - } - }, - "settings": { - "executionOrder": "v1", - "saveExecutionProgress": false, - "saveManualExecutions": true + "Build error message": {"main": [[{"node": "Telegram error notice", "type": "main", "index": 0}]]} }, + "settings": {"executionOrder": "v1", "saveExecutionProgress": false, "saveManualExecutions": true}, "staticData": null, - "meta": { - "templateCredsSetupCompleted": false - }, + "meta": {"templateCredsSetupCompleted": false}, "pinData": {} -} \ No newline at end of file +} diff --git a/stacks/n8n/workflows/instagram-discover.json b/stacks/n8n/workflows/instagram-discover.json index 1b5b506e..86b1d97e 100644 --- a/stacks/n8n/workflows/instagram-discover.json +++ b/stacks/n8n/workflows/instagram-discover.json @@ -7,20 +7,23 @@ { "parameters": { "rule": { - "interval": [{"field": "minutes", "minutesInterval": 30}] + "interval": [{ + "field": "cronExpression", + "expression": "0 9 * * *" + }] } }, - "id": "cron-30min", - "name": "Every 30 minutes", + "id": "cron-daily-9", + "name": "Daily 09:00", "type": "n8n-nodes-base.scheduleTrigger", "typeVersion": 1.1, "position": [250, 300], - "notes": "Trigger every 30 minutes." + "notes": "Once a day kickstart. Sends 1 candidate so the user can start a training session by tapping. The approval workflow's chain takes over from there — every approve/reject sends the next candidate immediately. Daily cadence avoids spamming Telegram if the user is actively training; the loop is user-paced." }, { "parameters": { "method": "GET", - "url": "={{ $env.INSTAGRAM_POSTER_INTERNAL_URL }}/candidates?limit=3", + "url": "={{ $env.INSTAGRAM_POSTER_INTERNAL_URL }}/candidates?limit=1", "options": {"timeout": 60000} }, "id": "candidates", @@ -89,7 +92,7 @@ } ], "connections": { - "Every 30 minutes": {"main": [[{"node": "Get top-3 ranked candidates", "type": "main", "index": 0}]]}, + "Daily 09:00": {"main": [[{"node": "Get top-3 ranked candidates", "type": "main", "index": 0}]]}, "Get top-3 ranked candidates": {"main": [[{"node": "Split candidates", "type": "main", "index": 0}]]}, "Split candidates": {"main": [[{"node": "Loop one at a time", "type": "main", "index": 0}]]}, "Loop one at a time": {"main": [[{"node": "Build caption", "type": "main", "index": 0}]]},