handletelegram: add log for stuck update handlers

This commit is contained in:
Tulir Asokan
2026-03-03 16:22:41 +02:00
parent 67adededff
commit a84dd2f30c
2 changed files with 92 additions and 50 deletions

View File

@@ -178,53 +178,7 @@ func NewTelegramClient(ctx context.Context, tc *TelegramConnector, login *bridge
}
dispatcher := tg.NewUpdateDispatcher()
dispatcher.OnFallback(client.onEntityUpdate)
dispatcher.OnNewMessage(func(ctx context.Context, e tg.Entities, update *tg.UpdateNewMessage) error {
return client.onUpdateNewMessage(ctx, e, update)
})
dispatcher.OnNewChannelMessage(func(ctx context.Context, e tg.Entities, update *tg.UpdateNewChannelMessage) error {
return client.onUpdateNewMessage(ctx, e, update)
})
dispatcher.OnChannel(client.onUpdateChannel)
dispatcher.OnUserName(client.onUserName)
dispatcher.OnDeleteMessages(func(ctx context.Context, e tg.Entities, update *tg.UpdateDeleteMessages) error {
return client.onDeleteMessages(ctx, 0, update)
})
dispatcher.OnDeleteChannelMessages(func(ctx context.Context, e tg.Entities, update *tg.UpdateDeleteChannelMessages) error {
return client.onDeleteMessages(ctx, update.ChannelID, update)
})
dispatcher.OnEditMessage(func(ctx context.Context, e tg.Entities, update *tg.UpdateEditMessage) error {
return client.onMessageEdit(ctx, update)
})
dispatcher.OnEditChannelMessage(func(ctx context.Context, e tg.Entities, update *tg.UpdateEditChannelMessage) error {
return client.onMessageEdit(ctx, update)
})
dispatcher.OnUserTyping(func(ctx context.Context, e tg.Entities, update *tg.UpdateUserTyping) error {
return client.handleTyping(client.makePortalKeyFromID(ids.PeerTypeUser, update.UserID, 0), client.senderForUserID(update.UserID), update.Action)
})
dispatcher.OnChatUserTyping(func(ctx context.Context, e tg.Entities, update *tg.UpdateChatUserTyping) error {
if update.FromID.TypeID() != tg.PeerUserTypeID {
log.Warn().Str("from_id_type", update.FromID.TypeName()).Msg("unsupported from_id type")
return nil
}
return client.handleTyping(client.makePortalKeyFromID(ids.PeerTypeChat, update.ChatID, 0), client.getPeerSender(update.FromID), update.Action)
})
dispatcher.OnChannelUserTyping(func(ctx context.Context, e tg.Entities, update *tg.UpdateChannelUserTyping) error {
return client.handleTyping(client.makePortalKeyFromID(ids.PeerTypeChannel, update.ChannelID, update.TopMsgID), client.getPeerSender(update.FromID), update.Action)
})
dispatcher.OnReadHistoryOutbox(client.updateReadReceipt)
dispatcher.OnReadHistoryInbox(func(ctx context.Context, e tg.Entities, update *tg.UpdateReadHistoryInbox) error {
return client.onOwnReadReceipt(client.makePortalKeyFromPeer(update.Peer, update.TopMsgID), update.MaxID)
})
dispatcher.OnReadChannelInbox(func(ctx context.Context, e tg.Entities, update *tg.UpdateReadChannelInbox) error {
return client.onOwnReadReceipt(client.makePortalKeyFromID(ids.PeerTypeChannel, update.ChannelID, 0), update.MaxID)
})
dispatcher.OnNotifySettings(client.onNotifySettings)
dispatcher.OnPinnedDialogs(client.onPinnedDialogs)
dispatcher.OnChatDefaultBannedRights(client.onChatDefaultBannedRights)
dispatcher.OnPeerBlocked(client.onPeerBlocked)
dispatcher.OnChat(client.onChat)
dispatcher.OnPhoneCall(client.onPhoneCall)
dispatcher.OnFallback(client.onUpdateWrapper)
client.updatesManager = updates.New(updates.Config{
OnNotChannelMember: client.onNotChannelMember,
@@ -426,7 +380,8 @@ func (t *TelegramClient) sendBadCredentialsOrUnknownError(err error) {
}
func (t *TelegramClient) onPing() {
if t.userLogin.BridgeState.GetPrev().StateEvent == status.StateConnected {
prev := t.userLogin.BridgeState.GetPrev()
if prev.StateEvent == status.StateConnected || prev.Error == updateHandlerStuck {
return
}
ctx := t.userLogin.Log.WithContext(t.main.Bridge.BackgroundCtx)

View File

@@ -860,7 +860,46 @@ func (t *TelegramClient) updateChannel(ctx context.Context, channel *tg.Channel)
return userInfo, nil
}
func (t *TelegramClient) onEntityUpdate(ctx context.Context, e tg.Entities, upd tg.UpdateClass) error {
const updateHandlerStuck status.BridgeStateErrorCode = "tg-update-handler-stuck"
func (t *TelegramClient) onUpdateWrapper(ctx context.Context, e tg.Entities, upd tg.UpdateClass) error {
doneChan := make(chan error, 1)
go func() {
doneChan <- t.onUpdate(ctx, e, upd)
}()
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
startedAt := time.Now()
bridgeStateUpdated := false
for {
select {
case <-ticker.C:
zerolog.Ctx(ctx).Warn().
Time("started_at", startedAt).
Msg("Telegram update handling is taking long")
if time.Since(startedAt) > 3*time.Minute && !bridgeStateUpdated {
bridgeStateUpdated = true
t.userLogin.BridgeState.Send(status.BridgeState{
StateEvent: status.StateUnknownError,
Error: updateHandlerStuck,
Message: "Processing messages from Telegram is stuck",
})
}
case err := <-doneChan:
if bridgeStateUpdated && t.userLogin.BridgeState.GetPrevUnsent().Error == updateHandlerStuck {
t.userLogin.BridgeState.Send(status.BridgeState{
StateEvent: status.StateConnected,
Info: map[string]any{
"update_reason": "finished processing slow update",
},
})
}
return err
}
}
}
func (t *TelegramClient) onUpdate(ctx context.Context, e tg.Entities, upd tg.UpdateClass) error {
zerolog.Ctx(ctx).Trace().Stringer("update", upd).Msg("Raw update")
for userID, user := range e.Users {
zerolog.Ctx(ctx).Trace().Stringer("user", user).Msg("Raw user info in update")
@@ -883,7 +922,55 @@ func (t *TelegramClient) onEntityUpdate(ctx context.Context, e tg.Entities, upd
return err
}
}
return nil
switch update := upd.(type) {
case *tg.UpdateNewMessage:
return t.onUpdateNewMessage(ctx, e, update)
case *tg.UpdateNewChannelMessage:
return t.onUpdateNewMessage(ctx, e, update)
case *tg.UpdateChannel:
return t.onUpdateChannel(ctx, e, update)
case *tg.UpdateUserName:
return t.onUserName(ctx, e, update)
case *tg.UpdateDeleteMessages:
return t.onDeleteMessages(ctx, 0, update)
case *tg.UpdateDeleteChannelMessages:
return t.onDeleteMessages(ctx, update.ChannelID, update)
case *tg.UpdateEditMessage:
return t.onMessageEdit(ctx, update)
case *tg.UpdateEditChannelMessage:
return t.onMessageEdit(ctx, update)
case *tg.UpdateUserTyping:
return t.handleTyping(t.makePortalKeyFromID(ids.PeerTypeUser, update.UserID, 0), t.senderForUserID(update.UserID), update.Action)
case *tg.UpdateChatUserTyping:
if update.FromID.TypeID() != tg.PeerUserTypeID {
zerolog.Ctx(ctx).Warn().Str("from_id_type", update.FromID.TypeName()).Msg("unsupported from_id type")
return nil
}
return t.handleTyping(t.makePortalKeyFromID(ids.PeerTypeChat, update.ChatID, 0), t.getPeerSender(update.FromID), update.Action)
case *tg.UpdateChannelUserTyping:
return t.handleTyping(t.makePortalKeyFromID(ids.PeerTypeChannel, update.ChannelID, update.TopMsgID), t.getPeerSender(update.FromID), update.Action)
case *tg.UpdateReadHistoryOutbox:
return t.updateReadReceipt(ctx, e, update)
case *tg.UpdateReadHistoryInbox:
return t.onOwnReadReceipt(t.makePortalKeyFromPeer(update.Peer, update.TopMsgID), update.MaxID)
case *tg.UpdateReadChannelInbox:
return t.onOwnReadReceipt(t.makePortalKeyFromID(ids.PeerTypeChannel, update.ChannelID, 0), update.MaxID)
case *tg.UpdateNotifySettings:
return t.onNotifySettings(ctx, e, update)
case *tg.UpdatePinnedDialogs:
return t.onPinnedDialogs(ctx, e, update)
case *tg.UpdateChatDefaultBannedRights:
return t.onChatDefaultBannedRights(ctx, e, update)
case *tg.UpdatePeerBlocked:
return t.onPeerBlocked(ctx, e, update)
case *tg.UpdateChat:
return t.onChat(ctx, e, update)
case *tg.UpdatePhoneCall:
return t.onPhoneCall(ctx, e, update)
default:
zerolog.Ctx(ctx).Debug().Type("update_type", update).Msg("Unhandled update type")
return nil
}
}
func (t *TelegramClient) onMessageEdit(ctx context.Context, update IGetMessage) error {