Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 79 additions & 24 deletions src/commands/github.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

import type { PluginContext } from '../plugin-types.js';
import { getRepoFromGitRemote, normalizeGithubLabelPrefix, SecondaryRateLimitError, setVerboseLogger } from '../github.js';
import { getLockPathForJsonl, withFileLock } from '../file-lock.js';
import { resolveWorklogDir } from '../worklog-paths.js';
import path from 'node:path';
import { ProgressReporter, ProgressMode } from '../progress.js';
import throttler from '../github-throttler.js';
import { upsertIssuesFromWorkItems, importIssuesToWorkItems, GithubProgress, GithubSyncResult, SyncedItem, SyncErrorItem, FieldChange } from '../github-sync.js';
Expand Down Expand Up @@ -88,7 +91,10 @@ export default function register(ctx: PluginContext): void {
};

const progressMode = (options as any).progress as ProgressMode | undefined;
const progressReporter = new ProgressReporter({ mode: progressMode ?? (isJsonMode ? 'json' : undefined) });
const progressReporter = new ProgressReporter({
mode: progressMode ?? (isJsonMode ? 'json' : undefined),
rateMs: 250,
});
const renderProgress = (progress: GithubProgress) => {
if (progress.phase === 'push') {
const totalItems = Math.max(pushTotalItems, 0);
Expand All @@ -102,7 +108,7 @@ export default function register(ctx: PluginContext): void {
? currentBatchLength
: Math.min(Math.max(totalItems - batchIdx * BATCH_SIZE, 0), BATCH_SIZE);
const itemNumberInBatch = Math.min(Math.max(progress.current, 1), batchItemCount || BATCH_SIZE);
message = `Push: Batch ${batchIdx + 1}/${totalBatches} Item ${itemNumberInBatch}/${batchItemCount || BATCH_SIZE}`;
message = `Push: Batch ${batchIdx + 1}/${totalBatches} Completed ${itemNumberInBatch}/${batchItemCount || BATCH_SIZE}`;
}
// Append throttler stats to push message for diagnostic visibility
try {
Expand All @@ -125,26 +131,33 @@ export default function register(ctx: PluginContext): void {
};

try {
const githubConfig = resolveGithubConfig({ repo: options.repo, labelPrefix: options.labelPrefix });
const repoUrl = `https://github.com/${githubConfig.repo}/issues`;
if (!isJsonMode) {
console.log(`Pushing to ${repoUrl}`);
}
const items = db.getAll();
const comments = db.getAllComments();
// Acquire a per-repo file lock to serialize github push operations and
// avoid races where concurrent push runs update the last-push timestamp
// out-of-band and cause items to be skipped. Use the JSONL path as the
// lock target so it is repo-scoped and consistent with other file-locks.
const jsonlPath = path.join(resolveWorklogDir(), 'worklog-data.jsonl');
const lockPath = getLockPathForJsonl(jsonlPath);
await withFileLock(lockPath, async () => {
const githubConfig = resolveGithubConfig({ repo: options.repo, labelPrefix: options.labelPrefix });
const repoUrl = `https://github.com/${githubConfig.repo}/issues`;
if (!isJsonMode) {
console.log(`Pushing to ${repoUrl}`);
}
const items = db.getAll();
const comments = db.getAllComments();

let itemsToProcess = items;
let commentsToProcess = comments;
let lastPush: string | null = null;
// Pass DB to timestamp helpers when available so they may use metadata
const dbForMetadata = typeof db.getAll === 'function' && typeof (db as any).store === 'object' ? (db as any).store : undefined;
let itemsToProcess = items;
let commentsToProcess = comments;
let lastPush: string | null = null;
// Pass DB to timestamp helpers when available so they may use metadata
const dbForMetadata = typeof db.getAll === 'function' && typeof (db as any).store === 'object' ? (db as any).store : undefined;

// Eagerly capture writeLastPushTimestamp when the pre-filter module is
// available. It may be resolved during the pre-filter import below or
// via a standalone import before the batch loop.
let _writeLastPushTimestamp: ((ts: string, db?: { setMetadata?: (k: string, v: string) => void }, repo?: string | null) => void) | null = null;
// Eagerly capture writeLastPushTimestamp when the pre-filter module is
// available. It may be resolved during the pre-filter import below or
// via a standalone import before the batch loop.
let _writeLastPushTimestamp: ((ts: string, db?: { setMetadata?: (k: string, v: string) => void }, repo?: string | null) => void) | null = null;

const forceAll = Boolean(options.all) || Boolean(options.force);
const forceAll = Boolean(options.all) || Boolean(options.force);
if (options.force && !options.all) {
if (!isJsonMode) console.error('Warning: --force is deprecated and will be removed in a future release. Use --all instead.');
logLine('github push: --force is deprecated; use --all instead');
Expand All @@ -155,7 +168,7 @@ export default function register(ctx: PluginContext): void {
let preFilterDeletedWithoutIssueCount = 0;
if (forceAll) {
// Bypass pre-filter when --all (or deprecated --force) specified
if (!isJsonMode) console.log(`Full push (--all): processing all ${items.length} items`);
if (!isJsonMode && !options.id) console.log(`Full push (--all): processing all ${items.length} items`);
logLine('github push: --all mode enabled - processing all items');
// Still need the timestamp writer even in --all mode; resolve it here.
if (!_writeLastPushTimestamp) {
Expand All @@ -179,7 +192,7 @@ export default function register(ctx: PluginContext): void {
commentsToProcess = filteredComments;
preFilterSkippedCount = skippedCount;
preFilterDeletedWithoutIssueCount = deletedWithoutIssueCount;
if (!isJsonMode) {
if (!isJsonMode && !options.id) {
const parts: string[] = [];
if (skippedCount > 0) parts.push(`${skippedCount} unchanged since last push`);
if (deletedWithoutIssueCount > 0) parts.push(`${deletedWithoutIssueCount} deleted without issue number`);
Expand All @@ -199,15 +212,51 @@ export default function register(ctx: PluginContext): void {

// --id: restrict to a single work item when provided
if (options.id) {
const singleItem = itemsToProcess.find(i => i.id === options.id);
// When --id is supplied, bypass the pre-filter and always push the
// specified work item (do not require it to be a candidate in the
// pre-filtered set). This ensures explicit single-item pushes always
// run even if the pre-filter would otherwise exclude the item.
const singleItem = items.find(i => i.id === options.id);
if (!singleItem) {
throw new Error(`Work item '${options.id}' not found (or not a candidate for push).`);
throw new Error(`Work item '${options.id}' not found.`);
}
itemsToProcess = [singleItem];
commentsToProcess = commentsToProcess.filter(c => c.workItemId === options.id);
commentsToProcess = comments.filter(c => c.workItemId === options.id);
if (!isJsonMode) {
console.log(`Processing 1 of ${items.length} items (--id ${options.id})`);
}
logLine(`github push: --id mode; pushing single item ${options.id}`);
}

// Defensive: ensure we didn't miss any items that were updated since
// the last push timestamp. In rare race conditions or when the
// pre-filter behaved unexpectedly, an item with updatedAt > lastPush
// might be missing from itemsToProcess. Add any such items now so the
// push run is robust.
if (!forceAll && !options.id && lastPush) {
try {
const lastMs = new Date(lastPush).getTime();
if (!Number.isNaN(lastMs)) {
const existingIds = new Set(itemsToProcess.map(i => i.id));
const additional = items.filter(it => !existingIds.has(it.id)).filter(it => {
const updatedMs = new Date(it.updatedAt).getTime();
return !Number.isNaN(updatedMs) && updatedMs > lastMs;
});
if (additional.length > 0) {
// Append additional items preserving the natural order from `items`.
for (const it of items) {
if (additional.find(a => a.id === it.id)) itemsToProcess.push(it);
}
// Add comments for additional items
for (const c of comments) {
if (additional.find(a => a.id === c.workItemId)) commentsToProcess.push(c);
}
logLine(`github push: added ${additional.length} item(s) newer than lastPush`);
}
}
} catch (_) {}
}

// Capture push-start timestamp BEFORE processing begins so that items
// modified during the push window are re-processed on the next run.
const pushStartTimestamp = new Date().toISOString();
Expand All @@ -218,6 +267,7 @@ export default function register(ctx: PluginContext): void {

pushTotalItems = itemsToProcess.length;


// Process items in fixed batches of 10 so progress is persisted after
// each batch and a single failure does not require reprocessing everything.
const totalBatches = Math.max(Math.ceil(itemsToProcess.length / BATCH_SIZE), 1);
Expand Down Expand Up @@ -266,6 +316,10 @@ export default function register(ctx: PluginContext): void {
currentBatchLength = batchItems.length;

logLine(`github push: batch ${batchIndex + 1}/${totalBatches} items=${batchItems.length}`);
// Diagnostic: list batch item ids for debugging why items may be skipped
try {
logLine(`github push: batch ${batchIndex + 1} ids=${batchItems.map(i => i.id).join(',')}`);
} catch (_) {}

let batchResult;
try {
Expand Down Expand Up @@ -512,6 +566,7 @@ export default function register(ctx: PluginContext): void {
}
}
logLine(`--- github push end ${new Date().toISOString()} ---`);
});
} catch (error) {
logLine(`GitHub sync failed: ${(error as Error).message}`);
output.error(`GitHub sync failed: ${(error as Error).message}`, { success: false, error: (error as Error).message });
Expand Down
2 changes: 2 additions & 0 deletions src/github-pre-filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ export function filterItemsForPush(items: WorkItem[], comments: Comment[], lastP
if (item.githubIssueNumber == null) return true;
const updatedMs = new Date(item.updatedAt).getTime();
if (Number.isNaN(updatedMs)) return true; // treat unknown updatedAt as changed
// Compare against the last-push timestamp.
// (Explicit --id pushes bypass this filter.)
return updatedMs > lastMs;
});

Expand Down
106 changes: 54 additions & 52 deletions src/github-sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ export async function upsertIssuesFromWorkItems(
const updatedItems: WorkItem[] = [...items];
const result: GithubSyncResult = { updated: 0, created: 0, closed: 0, skipped: 0, errors: [], syncedItems: [], errorItems: [] };
const updatedById = new Map<string, WorkItem>();
let processed = 0;
let completed = 0;
let skippedUpdates = 0;

const sortCommentsByCreatedAt = (left: Comment, right: Comment) => {
Expand Down Expand Up @@ -253,51 +253,48 @@ export async function upsertIssuesFromWorkItems(
title.length <= maxLen ? title : title.slice(0, maxLen - 1) + '\u2026';

async function upsertMapper(item: WorkItem, idx: number) {
if (onProgress) {
emitProgress('push', idx + 1, items.length);
}
// Guard: skip deleted items that have no GitHub issue (prevent accidental creation)
if (item.status === 'deleted' && !item.githubIssueNumber) {
if (onVerboseLog) {
onVerboseLog(`[upsert] skip deleted item ${item.id} (no githubIssueNumber)`);
try {
// Guard: skip deleted items that have no GitHub issue (prevent accidental creation)
if (item.status === 'deleted' && !item.githubIssueNumber) {
if (onVerboseLog) {
onVerboseLog(`[upsert] skip deleted item ${item.id} (no githubIssueNumber)`);
}
skippedUpdates += 1;
return;
}
skippedUpdates += 1;
return;
}
const itemComments = byItemId.get(item.id) || [];
const shouldSyncComments = commentNeedsSync(item, itemComments);
increment('items.processed');
if (
item.githubIssueNumber &&
item.githubIssueUpdatedAt &&
new Date(item.updatedAt).getTime() <= new Date(item.githubIssueUpdatedAt).getTime() &&
!shouldSyncComments
) {
if (onVerboseLog) {
onVerboseLog(`[upsert] skip ${item.id} (no issue or comment changes)`);
const itemComments = byItemId.get(item.id) || [];
const shouldSyncComments = commentNeedsSync(item, itemComments);
increment('items.processed');
if (
item.githubIssueNumber &&
item.githubIssueUpdatedAt &&
new Date(item.updatedAt).getTime() <= new Date(item.githubIssueUpdatedAt).getTime() &&
!shouldSyncComments
) {
if (onVerboseLog) {
onVerboseLog(`[upsert] skip ${item.id} (no issue or comment changes)`);
}
skippedUpdates += 1;
return;
}
skippedUpdates += 1;
return;
}
const payload = workItemToIssuePayload(item, itemComments, labelPrefix, items);
const payload = workItemToIssuePayload(item, itemComments, labelPrefix, items);

try {
let issue: GithubIssueRecord | null = null;
let issueNumber = item.githubIssueNumber;
let issueUpdatedAt = item.githubIssueUpdatedAt || null;
const shouldUpdateIssue = !item.githubIssueNumber
|| !item.githubIssueUpdatedAt
|| new Date(item.updatedAt).getTime() > new Date(item.githubIssueUpdatedAt).getTime();
if (shouldUpdateIssue) {
if (shouldUpdateIssue) {
const upsertStart = Date.now();
if (onVerboseLog) {
onVerboseLog(`[upsert] ${item.githubIssueNumber ? 'update' : 'create'} ${item.id}`);
}
if (item.githubIssueNumber) {
increment('api.issue.update');
// updateGithubIssueAsync already schedules via the central throttler
// internally (see src/github.ts). Avoid double-scheduling here.
issue = await updateGithubIssueAsync(config, item.githubIssueNumber!, payload);
if (item.githubIssueNumber) {
increment('api.issue.update');
// updateGithubIssueAsync already schedules via the central throttler
// internally (see src/github.ts). Avoid double-scheduling here.
issue = await updateGithubIssueAsync(config, item.githubIssueNumber!, payload);
if (item.status === 'deleted') {
result.closed += 1;
result.syncedItems.push({
Expand All @@ -315,14 +312,14 @@ export async function upsertIssuesFromWorkItems(
issueNumber: item.githubIssueNumber,
});
}
} else {
increment('api.issue.create');
// createGithubIssueAsync schedules via the central throttler itself.
issue = await createGithubIssueAsync(config, {
title: payload.title,
body: payload.body,
labels: payload.labels,
});
} else {
increment('api.issue.create');
// createGithubIssueAsync schedules via the central throttler itself.
issue = await createGithubIssueAsync(config, {
title: payload.title,
body: payload.body,
labels: payload.labels,
});
result.created += 1;
result.syncedItems.push({
action: 'created',
Expand Down Expand Up @@ -350,17 +347,17 @@ export async function upsertIssuesFromWorkItems(

const shouldSyncCommentsNow = itemComments.length > 0 && (shouldSyncComments || shouldUpdateIssue);
if (shouldSyncCommentsNow && issueNumber) {
const commentListStart = Date.now();
increment('api.comment.list');
// listGithubIssueCommentsAsync now schedules internally via the throttler
// (see src/github.ts). Call it directly to avoid double-scheduling.
const existingComments = await listGithubIssueCommentsAsync(config, issueNumber!);
timing.commentListMs += Date.now() - commentListStart;
const commentUpsertStart = Date.now();
const commentSummary = await upsertGithubIssueCommentsAsync(config, issueNumber, itemComments, existingComments);
timing.commentUpsertMs += Date.now() - commentUpsertStart;
// small yield after comment work
if (idx % 5 === 0) await new Promise((res) => setImmediate(res));
const commentListStart = Date.now();
increment('api.comment.list');
// listGithubIssueCommentsAsync now schedules internally via the throttler
// (see src/github.ts). Call it directly to avoid double-scheduling.
const existingComments = await listGithubIssueCommentsAsync(config, issueNumber!);
timing.commentListMs += Date.now() - commentListStart;
const commentUpsertStart = Date.now();
const commentSummary = await upsertGithubIssueCommentsAsync(config, issueNumber, itemComments, existingComments);
timing.commentUpsertMs += Date.now() - commentUpsertStart;
// small yield after comment work
if (idx % 5 === 0) await new Promise((res) => setImmediate(res));
increment('api.comment.create', commentSummary.created || 0);
increment('api.comment.update', commentSummary.updated || 0);
result.commentsCreated = (result.commentsCreated || 0) + commentSummary.created;
Expand All @@ -384,6 +381,11 @@ export async function upsertIssuesFromWorkItems(
error: (error as Error).message,
});
updatedById.set(item.id, item);
} finally {
completed += 1;
if (onProgress) {
emitProgress('push', completed, items.length);
}
}
}

Expand Down
15 changes: 14 additions & 1 deletion src/github-throttler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
* The clock is injectable to allow deterministic unit tests.
*/

import { AsyncLocalStorage } from 'node:async_hooks';

export type Clock = { now(): number };

export type ThrottlerOptions = {
Expand Down Expand Up @@ -42,6 +44,10 @@ export class TokenBucketThrottler {
private retryCount = 0;
private errorCount = 0;

// Marks execution that already runs inside this throttler so nested
// schedule() calls can run inline without deadlocking on concurrency.
private readonly taskContext = new AsyncLocalStorage<boolean>();

// Expose simple stats without blocking the throttler operation
getStats() {
return {
Expand Down Expand Up @@ -103,6 +109,13 @@ export class TokenBucketThrottler {
}

schedule<T>(fn: () => Promise<T> | T): Promise<T> {
// Reentrant path: if we are already inside a scheduled task for this
// throttler instance, execute inline to avoid self-deadlock when the
// outer task has consumed available concurrency slots.
if (this.taskContext.getStore()) {
return Promise.resolve().then(fn);
}

return new Promise<T>((resolve, reject) => {
const task: Task<T> = { fn, resolve, reject } as Task<T>;
this.queue.push(task as Task<unknown>);
Expand Down Expand Up @@ -162,7 +175,7 @@ export class TokenBucketThrottler {

// Execute task
Promise.resolve()
.then(() => task.fn())
.then(() => this.taskContext.run(true, () => task.fn()))
.then((res) => {
this.active -= 1;
(task.resolve as (v: unknown) => void)(res);
Expand Down
Loading
Loading