chatsync: merge post-login and takeout syncs and refactor everything

This commit is contained in:
Tulir Asokan
2026-03-19 01:36:30 +02:00
parent b1b5745033
commit bfe5999951
10 changed files with 407 additions and 345 deletions

View File

@@ -55,7 +55,7 @@ func (t *TelegramClient) getTakeoutID(ctx context.Context) (takeoutID int64, err
// Resume fetching dialogs using takeout and enqueueing them for
// backfill.
go t.takeoutDialogsOnce.Do(func() {
if err = t.takeoutDialogs(ctx, takeoutID); err != nil {
if err = t.syncChats(ctx, takeoutID, false); err != nil {
log.Err(err).Msg("Failed to takeout dialogs")
}
})
@@ -88,7 +88,7 @@ func (t *TelegramClient) getTakeoutID(ctx context.Context) (takeoutID int64, err
// Fetch all dialogs using takeout and enqueue them for backfill.
go t.takeoutDialogsOnce.Do(func() {
if err = t.takeoutDialogs(ctx, takeoutID); err != nil {
if err = t.syncChats(ctx, takeoutID, false); err != nil {
log.Err(err).Msg("Failed to takeout dialogs")
}
})
@@ -98,84 +98,6 @@ func (t *TelegramClient) getTakeoutID(ctx context.Context) (takeoutID int64, err
}
}
func (t *TelegramClient) takeoutDialogs(ctx context.Context, takeoutID int64) error {
log := zerolog.Ctx(ctx).With().Str("loop", "chat_fetch").Logger()
if t.metadata.TakeoutDialogCrawlDone {
log.Debug().Msg("Dialogs already crawled")
return nil
}
req := tg.MessagesGetDialogsRequest{
Limit: 100,
OffsetPeer: &tg.InputPeerEmpty{},
}
if t.metadata.TakeoutDialogCrawlCursor != "" {
var err error
req.OffsetPeer, _, err = t.inputPeerForPortalID(ctx, t.metadata.TakeoutDialogCrawlCursor)
if err != nil {
return fmt.Errorf("failed to get input peer for pagination: %w", err)
}
}
for {
log.Info().Stringer("cursor", req.OffsetPeer).Msg("Fetching dialogs")
dialogs, err := APICallWithUpdates(ctx, t, func() (tg.ModifiedMessagesDialogs, error) {
var dialogs tg.MessagesDialogsBox
err := t.client.Invoke(ctx,
&tg.InvokeWithTakeoutRequest{TakeoutID: takeoutID, Query: &req},
&dialogs)
if err != nil {
return nil, err
} else if modified, ok := dialogs.Dialogs.AsModified(); !ok {
return nil, fmt.Errorf("unexpected response type: %T", dialogs.Dialogs)
} else {
return modified, nil
}
})
if err != nil {
return fmt.Errorf("failed to get dialogs: %w", err)
} else if len(dialogs.GetDialogs()) == 0 {
t.metadata.TakeoutDialogCrawlDone = true
if err = t.userLogin.Save(ctx); err != nil {
return fmt.Errorf("failed to save user login: %w", err)
}
log.Debug().Msg("No more dialogs found")
return nil
}
if req.OffsetPeer.TypeID() == tg.InputPeerEmptyTypeID {
// This is the first fetch of dialogs, reset the pinned dialogs
// based on the list.
if err := t.resetPinnedDialogs(ctx, dialogs.GetDialogs()); err != nil {
return err
}
}
err = t.handleDialogs(ctx, dialogs, -1)
if err != nil {
return fmt.Errorf("failed to handle dialogs: %w", err)
}
portalKey := t.makePortalKeyFromPeer(dialogs.GetDialogs()[len(dialogs.GetDialogs())-1].GetPeer(), 0)
if t.metadata.TakeoutDialogCrawlCursor == portalKey.ID {
t.metadata.TakeoutDialogCrawlDone = true
t.metadata.TakeoutDialogCrawlCursor = ""
log.Debug().Msg("No more dialogs found")
return nil
} else {
t.metadata.TakeoutDialogCrawlCursor = portalKey.ID
}
if err = t.userLogin.Save(ctx); err != nil {
return fmt.Errorf("failed to save user login: %w", err)
}
req.OffsetPeer, _, err = t.inputPeerForPortalID(ctx, portalKey.ID)
if err != nil {
return fmt.Errorf("failed to get input peer for pagination: %w", err)
}
}
}
func (t *TelegramClient) stopTakeout(ctx context.Context) error {
t.takeoutLock.Lock()
defer t.takeoutLock.Unlock()
@@ -197,8 +119,7 @@ func (t *TelegramClient) FetchMessages(ctx context.Context, fetchParams bridgev2
var takeoutID int64
var err error
// TODO use takeout for forward backfill if already available
if !fetchParams.Forward { // Backwards
if (t.main.Config.Takeout.ForwardBackfill && fetchParams.Forward) || (t.main.Config.Takeout.BackwardBackfill && !fetchParams.Forward) {
t.takeoutLock.Lock()
defer t.takeoutLock.Unlock()
takeoutID, err = t.getTakeoutID(ctx)
@@ -261,6 +182,7 @@ func (t *TelegramClient) FetchMessages(ctx context.Context, fetchParams bridgev2
log.Info().Any("req", req).Msg("Fetching messages")
msgs, err := APICallWithUpdates(ctx, t, func() (tg.ModifiedMessagesMessages, error) {
var box tg.MessagesMessagesBox
// TODO a single request can only fetch 100 messages, use multiple requests if the requested count is higher
err = t.client.Invoke(ctx, req, &box)
if err != nil {
return nil, err

356
pkg/connector/chatsync.go Normal file
View File

@@ -0,0 +1,356 @@
// mautrix-telegram - A Matrix-Telegram puppeting bridge.
// Copyright (C) 2025 Sumner Evans
// Copyright (C) 2026 Tulir Asokan
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
package connector
import (
"context"
"fmt"
"github.com/rs/zerolog"
"maunium.net/go/mautrix/bridgev2"
"maunium.net/go/mautrix/bridgev2/database"
"maunium.net/go/mautrix/bridgev2/networkid"
"maunium.net/go/mautrix/bridgev2/simplevent"
"go.mau.fi/mautrix-telegram/pkg/connector/ids"
"go.mau.fi/mautrix-telegram/pkg/gotd/bin"
"go.mau.fi/mautrix-telegram/pkg/gotd/tg"
"go.mau.fi/mautrix-telegram/pkg/gotd/tgerr"
)
func (t *TelegramClient) syncChats(ctx context.Context, takeoutID int64, onLogin bool) error {
if takeoutID != 0 && !t.main.Config.Takeout.DialogSync {
return nil
}
logWith := zerolog.Ctx(ctx).With().Str("loop", "chat sync")
if onLogin {
logWith = logWith.Bool("on_login", true)
}
if takeoutID != 0 {
logWith = logWith.Int64("takeout_id", takeoutID)
}
log := logWith.Logger()
if !t.syncChatsLock.TryLock() {
log.Warn().Msg("Waiting for chat sync lock")
t.syncChatsLock.Lock()
log.Debug().Msg("Acquired chat sync lock after waiting")
}
defer t.syncChatsLock.Unlock()
if t.metadata.DialogSyncComplete {
log.Debug().Msg("Dialogs already synced")
return nil
}
isFullSync := true
updateLimit := subtractLimit(t.main.Config.Sync.UpdateLimit, t.metadata.DialogSyncCount)
if onLogin && t.main.Config.Takeout.DialogSync {
updateLimit = t.main.Config.Sync.LoginLimit
isFullSync = false
}
createLimit := subtractLimit(t.main.Config.Sync.CreateLimit, t.metadata.DialogSyncCount)
var req tg.MessagesGetDialogsRequest
isFirst := true
if t.metadata.DialogSyncCursor != "" {
isFirst = false
var err error
req.OffsetPeer, _, err = t.inputPeerForPortalID(ctx, t.metadata.DialogSyncCursor)
if err != nil {
return fmt.Errorf("failed to get input peer for pagination: %w", err)
}
} else {
req.OffsetPeer = &tg.InputPeerEmpty{}
}
var wrappedReq bin.Object
if takeoutID != 0 {
wrappedReq = &tg.InvokeWithTakeoutRequest{TakeoutID: takeoutID, Query: &req}
} else {
wrappedReq = &req
}
for updateLimit < 0 || updateLimit > 0 {
if updateLimit < 0 {
req.Limit = 100
} else {
req.Limit = min(100, updateLimit)
}
log.Info().
Stringer("request", &req).
Int("update_limit", updateLimit).
Int("create_limit", createLimit).
Msg("Fetching dialogs")
dialogs, err := APICallWithUpdates(ctx, t, func() (tg.ModifiedMessagesDialogs, error) {
var dialogs tg.MessagesDialogsBox
retry := true
var err error
for retry {
retry, err = tgerr.FloodWait(ctx, t.client.Invoke(ctx, wrappedReq, &dialogs))
}
if err != nil {
return nil, err
} else if modified, ok := dialogs.Dialogs.AsModified(); !ok {
return nil, fmt.Errorf("unexpected response type: %T", dialogs.Dialogs)
} else {
return modified, nil
}
})
if err != nil {
return fmt.Errorf("failed to get dialogs: %w", err)
} else if len(dialogs.GetDialogs()) == 0 {
log.Debug().Msg("No more dialogs found (empty response)")
break
}
if isFirst {
// This is the first fetch of dialogs, reset the pinned dialogs based on the list.
if err = t.resetPinnedDialogs(ctx, dialogs.GetDialogs()); err != nil {
return fmt.Errorf("failed to save pinned dialogs: %w", err)
}
}
isFirst = false
dialogList := dialogs.GetDialogs()
if updateLimit > 0 && len(dialogList) > updateLimit {
dialogList = dialogList[:updateLimit]
}
err = t.handleDialogs(ctx, dialogList, dialogs, createLimit)
if err != nil {
return fmt.Errorf("failed to handle dialogs: %w", err)
}
updateLimit = subtractLimit(updateLimit, len(dialogList))
createLimit = subtractLimit(createLimit, len(dialogList))
cursorPortalKey := t.makePortalKeyFromPeer(dialogList[len(dialogList)-1].GetPeer(), 0)
if t.metadata.DialogSyncCursor == cursorPortalKey.ID {
log.Debug().Msg("No more dialogs found (last dialog is same as old cursor)")
break
}
t.metadata.DialogSyncCursor = cursorPortalKey.ID
t.metadata.DialogSyncCount += len(dialogList)
if err = t.userLogin.Save(ctx); err != nil {
return fmt.Errorf("failed to save user login to update cursor: %w", err)
}
req.OffsetPeer, _, err = t.inputPeerForPortalID(ctx, cursorPortalKey.ID)
if err != nil {
return fmt.Errorf("failed to get input peer for pagination: %w", err)
}
}
if isFullSync {
t.metadata.DialogSyncComplete = true
t.metadata.DialogSyncCursor = ""
t.metadata.DialogSyncCount = 0
if err := t.userLogin.Save(ctx); err != nil {
return fmt.Errorf("failed to save user login after successful sync: %w", err)
}
}
log.Info().Msg("Finished dialog sync")
return nil
}
func subtractLimit(limit, count int) int {
if limit < 0 {
return limit
}
limit -= count
if limit < 0 {
return 0
}
return limit
}
func (t *TelegramClient) resetPinnedDialogs(ctx context.Context, dialogs []tg.DialogClass) error {
t.metadata.PinnedDialogs = nil
for _, dialog := range dialogs {
if dialog.GetPinned() {
portalKey := t.makePortalKeyFromPeer(dialog.GetPeer(), 0)
t.metadata.PinnedDialogs = append(t.metadata.PinnedDialogs, portalKey.ID)
}
}
return t.userLogin.Save(ctx)
}
func (t *TelegramClient) handleDialogs(ctx context.Context, dialogList []tg.DialogClass, meta tg.ModifiedMessagesDialogs, createLimit int) error {
log := zerolog.Ctx(ctx)
users := map[int64]tg.UserClass{}
for _, user := range meta.GetUsers() {
users[user.GetID()] = user
}
chats := map[int64]tg.ChatClass{}
for _, chat := range meta.GetChats() {
chats[chat.GetID()] = chat
}
messages := map[networkid.MessageID]tg.MessageClass{}
for _, message := range meta.GetMessages() {
messages[ids.GetMessageIDFromMessage(message)] = message
}
for i, d := range dialogList {
dialog, ok := d.(*tg.Dialog)
if !ok {
continue
}
log := log.With().
Stringer("peer", dialog.Peer).
Int("top_message", dialog.TopMessage).
Logger()
log.Debug().Msg("Syncing dialog")
portalKey := t.makePortalKeyFromPeer(dialog.GetPeer(), 0)
portal, err := t.main.Bridge.GetPortalByKey(ctx, portalKey)
if err != nil {
return err
}
if dialog.UnreadCount == 0 && !dialog.UnreadMark {
portal.Metadata.(*PortalMetadata).ReadUpTo = dialog.TopMessage
}
var chatInfo *bridgev2.ChatInfo
switch peer := dialog.Peer.(type) {
case *tg.PeerUser:
switch user := users[peer.UserID].(type) {
case *tg.User:
if user.GetDeleted() {
log.Debug().Int64("user_id", peer.UserID).Msg("Not syncing portal because user is deleted")
continue
}
chatInfo, err = t.getDMChatInfo(ctx, peer.UserID)
if err != nil {
return fmt.Errorf("failed to get dm info for %d: %w", peer.UserID, err)
}
default:
log.Debug().
Int64("user_id", peer.UserID).
Type("user_type", user).
Msg("Not syncing portal because user type is unsupported")
continue
}
case *tg.PeerChat:
switch chat := chats[peer.ChatID].(type) {
case *tg.Chat:
// Need to get full chat info to get the member list
chatInfo, err = t.GetChatInfo(ctx, portal)
if err != nil {
return fmt.Errorf("failed to get chat info for %s: %w", portalKey, err)
}
case *tg.ChatForbidden:
log.Debug().
Int64("chat_id", peer.ChatID).
Msg("Not syncing portal because chat is forbidden")
continue
default:
log.Debug().
Int64("chat_id", peer.ChatID).
Type("chat_type", chat).
Msg("Not syncing portal because chat type is unsupported")
continue
}
case *tg.PeerChannel:
switch channel := chats[peer.ChannelID].(type) {
case *tg.Channel:
var mfm *memberFetchMeta
chatInfo, mfm, err = t.wrapChatInfo(portal.ID, channel)
if err != nil {
return fmt.Errorf("failed to get chat info for %s: %w", portalKey, err)
}
err = t.fillChannelMembers(ctx, mfm, chatInfo.Members)
if err != nil {
log.Err(err).Msg("Failed to get channel members")
}
case *tg.ChannelForbidden:
log.Debug().
Int64("channel_id", peer.ChannelID).
Msg("Not syncing portal because channel is forbidden")
continue
default:
log.Debug().
Int64("channel_id", peer.ChannelID).
Type("channel_type", channel).
Msg("Not syncing portal because channel type is unsupported")
continue
}
}
if portal.MXID == "" {
// Check what the latest message is
topMessage := messages[ids.MakeMessageID(dialog.Peer, dialog.TopMessage)]
if topMessage == nil {
if dialog.TopMessage == 0 {
log.Debug().Msg("Not syncing portal because there are no messages")
continue
}
log.Warn().Msg("TopMessage of dialog not in messages map")
} else if topMessage.TypeID() == tg.MessageServiceTypeID {
action := topMessage.(*tg.MessageService).Action
if action.TypeID() == tg.MessageActionContactSignUpTypeID || action.TypeID() == tg.MessageActionHistoryClearTypeID {
log.Debug().Str("action_type", action.TypeName()).Msg("Not syncing portal because it's a contact sign up or history clear")
continue
}
}
if createLimit >= 0 && i >= createLimit {
continue
}
}
t.fillUserLocalMeta(chatInfo, dialog)
res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{
ChatInfo: chatInfo,
EventMeta: simplevent.EventMeta{
Type: bridgev2.RemoteEventChatResync,
LogContext: func(c zerolog.Context) zerolog.Context {
return c.Str("update", "sync")
},
PortalKey: portalKey,
CreatePortal: true,
},
CheckNeedsBackfillFunc: func(ctx context.Context, latestMessage *database.Message) (bool, error) {
if latestMessage == nil {
return true, nil
}
_, latestMessageID, err := ids.ParseMessageID(latestMessage.ID)
if err != nil {
panic(err)
}
return dialog.TopMessage > latestMessageID, nil
},
})
if err = resultToError(res); err != nil {
return err
}
// Generate a read receipt from the last known read message id
res = t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Receipt{
EventMeta: simplevent.EventMeta{
Type: bridgev2.RemoteEventReadReceipt,
PortalKey: portalKey,
Sender: t.mySender(),
},
LastTarget: ids.MakeMessageID(portalKey, dialog.ReadInboxMaxID),
ReadUpToStreamOrder: int64(dialog.ReadInboxMaxID),
})
if err = resultToError(res); err != nil {
return err
}
}
return nil
}

View File

@@ -110,6 +110,7 @@ type TelegramClient struct {
takeoutAccepted *exsync.Event
stopTakeoutTimer *time.Timer
takeoutDialogsOnce sync.Once
syncChatsLock sync.Mutex
prevReactionPoll map[networkid.PortalKey]time.Time
prevReactionPollLock sync.Mutex

View File

@@ -54,7 +54,7 @@ func fnSync(ce *commands.Event) {
wg.Add(1)
go func() {
defer wg.Done()
if err := client.SyncChats(ce.Ctx); err != nil {
if err := client.syncChats(ce.Ctx, 0, false); err != nil {
ce.Reply("Failed to synchronize chats for %s: %v", login.ID, err)
}
}()

View File

@@ -68,9 +68,16 @@ type TelegramConfig struct {
Sync struct {
UpdateLimit int `yaml:"update_limit"`
CreateLimit int `yaml:"create_limit"`
LoginLimit int `yaml:"login_sync_limit"`
DirectChats bool `yaml:"direct_chats"`
} `yaml:"sync"`
Takeout struct {
DialogSync bool `yaml:"dialog_sync"`
ForwardBackfill bool `yaml:"forward_backfill"`
BackwardBackfill bool `yaml:"backward_backfill"`
} `yaml:"takeout"`
ContactAvatars bool `yaml:"contact_avatars"`
ContactNames bool `yaml:"contact_names"`
MaxMemberCount int `yaml:"max_member_count"`
@@ -110,7 +117,11 @@ func upgradeConfig(helper up.Helper) {
helper.Copy(up.Int, "ping", "timeout_seconds")
helper.Copy(up.Int, "sync", "update_limit")
helper.Copy(up.Int, "sync", "create_limit")
helper.Copy(up.Int, "sync", "login_sync_limit")
helper.Copy(up.Bool, "sync", "direct_chats")
helper.Copy(up.Bool, "takeout", "dialog_sync")
helper.Copy(up.Bool, "takeout", "forward_backfill")
helper.Copy(up.Bool, "takeout", "backward_backfill")
helper.Copy(up.Bool, "contact_avatars")
helper.Copy(up.Bool, "contact_names")
helper.Copy(up.Int, "max_member_count")
@@ -130,6 +141,7 @@ func (tg *TelegramConnector) GetConfig() (example string, data any, upgrader up.
{"member_list"},
{"ping"},
{"sync"},
{"takeout"},
{"max_member_count"},
},
Base: ExampleConfig,

View File

@@ -56,14 +56,29 @@ ping:
sync:
# Number of most recently active dialogs to check when syncing chats.
# Set to 0 to remove limit.
update_limit: 0
# Number of most recently active dialogs to create portals for when syncing
# chats.
# Set to 0 to remove limit.
# Set to -1 to remove limit.
update_limit: 100
# Number of most recently active dialogs to create portals for when syncing chats.
# Set to -1 to remove limit.
create_limit: 15
# Number of chats to sync immediately on login before the data export is accepted.
# The create_limit above still applies. This is ignored if takeout.dialog_sync is false.
login_sync_limit: 15
# Whether or not to sync and create portals for direct chats at startup.
direct_chats: false
direct_chats: true
takeout:
# Should the bridge use the data export mode for syncing the full chat list?
# If true, login_sync_limit of chats is synced immediately on login,
# then the rest are synced after the takeout is accepted.
dialog_sync: false
# Should the bridge use the data export mode for forward backfilling messages?
# This should be set to true if the forward backfill limits are set to high values,
# but is probably not necessary otherwise.
forward_backfill: false
# Should the bridge use the data export mode for backward backfilling messages?
# This only affects the backfill queue, which is only available on Beeper.
backward_backfill: false
# Maximum number of participants in chats to bridge. Only applies when the
# portal is being created. If there are more members when trying to create a

View File

@@ -241,7 +241,7 @@ func (bl *baseLogin) finalizeLogin(
err := client.clientInitialized.Wait(bgCtx)
if err != nil {
log.Err(err).Msg("Failed to wait for client init to sync chats after login")
} else if err = client.SyncChats(log.WithContext(client.clientCtx)); err != nil {
} else if err = client.syncChats(log.WithContext(client.clientCtx), 0, true); err != nil {
log.Err(err).Msg("Failed to sync chats")
}
}()
@@ -250,6 +250,9 @@ func (bl *baseLogin) finalizeLogin(
if metadata.IsBot {
return
}
if !bl.main.Config.Takeout.BackwardBackfill && !bl.main.Config.Takeout.ForwardBackfill && !bl.main.Config.Takeout.DialogSync {
return
}
log := ul.Log.With().Str("component", "post-login takeout").Logger()
client.takeoutLock.Lock()
defer client.takeoutLock.Unlock()

View File

@@ -57,7 +57,7 @@ func (gm *GhostMetadata) IsMin() bool {
type PortalMetadata struct {
IsSuperGroup bool `json:"is_supergroup,omitempty"`
IsForumGeneral bool `json:"is_forum_general,omitempty"`
ReadUpTo int `json:"read_up_to,omitempty"`
ReadUpTo int `json:"read_up_to,omitempty"` // FIXME this shouldn't be here
AllowedReactions []string `json:"allowed_reactions"`
LastSync jsontime.Unix `json:"last_sync,omitempty"`
FullSynced bool `json:"full_synced,omitempty"`
@@ -90,8 +90,9 @@ type UserLoginMetadata struct {
TakeoutInvalidated bool `json:"takeout_invalidated,omitempty"`
TakeoutDialogCrawlDone bool `json:"takeout_portal_crawl_done,omitempty"`
TakeoutDialogCrawlCursor networkid.PortalID `json:"takeout_portal_crawl_cursor,omitempty"`
DialogSyncComplete bool `json:"takeout_portal_crawl_done,omitempty"`
DialogSyncCursor networkid.PortalID `json:"takeout_portal_crawl_cursor,omitempty"`
DialogSyncCount int `json:"dialog_sync_count,omitempty"`
PinnedDialogs []networkid.PortalID `json:"pinned_dialogs,omitempty"`
@@ -101,9 +102,10 @@ type UserLoginMetadata struct {
func (u *UserLoginMetadata) ResetOnLogout() {
u.Session.AuthKey = nil
u.TakeoutID = 0
u.TakeoutDialogCrawlDone = false
u.TakeoutInvalidated = false
u.TakeoutDialogCrawlCursor = networkid.PortalID("")
u.DialogSyncComplete = false
u.DialogSyncCursor = networkid.PortalID("")
u.DialogSyncCount = 0
u.PushEncryptionKey = nil
}

View File

@@ -1,249 +0,0 @@
// mautrix-telegram - A Matrix-Telegram puppeting bridge.
// Copyright (C) 2025 Sumner Evans
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
package connector
import (
"context"
"fmt"
"math"
"github.com/rs/zerolog"
"maunium.net/go/mautrix/bridgev2"
"maunium.net/go/mautrix/bridgev2/database"
"maunium.net/go/mautrix/bridgev2/networkid"
"maunium.net/go/mautrix/bridgev2/simplevent"
"go.mau.fi/mautrix-telegram/pkg/connector/ids"
"go.mau.fi/mautrix-telegram/pkg/gotd/tg"
)
func (t *TelegramClient) SyncChats(ctx context.Context) error {
limit := t.main.Config.Sync.UpdateLimit
if limit <= 0 {
limit = math.MaxInt32
}
zerolog.Ctx(ctx).Info().
Int("update_limit", limit).
Int("create_limit", t.main.Config.Sync.CreateLimit).
Msg("syncing chats")
dialogs, err := APICallWithUpdates(ctx, t, func() (tg.ModifiedMessagesDialogs, error) {
d, err := t.client.API().MessagesGetDialogs(ctx, &tg.MessagesGetDialogsRequest{
Limit: limit,
OffsetPeer: &tg.InputPeerEmpty{},
})
if err != nil {
return nil, err
} else if dialogs, ok := d.(tg.ModifiedMessagesDialogs); !ok {
return nil, fmt.Errorf("unexpected dialogs type %T", d)
} else {
return dialogs, nil
}
})
if err != nil {
return err
}
if err := t.resetPinnedDialogs(ctx, dialogs.GetDialogs()); err != nil {
return err
}
return t.handleDialogs(ctx, dialogs, t.main.Config.Sync.CreateLimit)
}
func (t *TelegramClient) resetPinnedDialogs(ctx context.Context, dialogs []tg.DialogClass) error {
t.metadata.PinnedDialogs = nil
for _, dialog := range dialogs {
if dialog.GetPinned() {
portalKey := t.makePortalKeyFromPeer(dialog.GetPeer(), 0)
t.metadata.PinnedDialogs = append(t.metadata.PinnedDialogs, portalKey.ID)
}
}
return t.userLogin.Save(ctx)
}
func (t *TelegramClient) handleDialogs(ctx context.Context, dialogs tg.ModifiedMessagesDialogs, createLimit int) error {
log := zerolog.Ctx(ctx)
users := map[networkid.UserID]tg.UserClass{}
for _, user := range dialogs.GetUsers() {
users[ids.MakeUserID(user.GetID())] = user
}
chats := map[int64]tg.ChatClass{}
for _, chat := range dialogs.GetChats() {
chats[chat.GetID()] = chat
}
messages := map[networkid.MessageID]tg.MessageClass{}
for _, message := range dialogs.GetMessages() {
messages[ids.GetMessageIDFromMessage(message)] = message
}
var created int
for _, d := range dialogs.GetDialogs() {
dialog, ok := d.(*tg.Dialog)
if !ok {
continue
}
log := log.With().
Stringer("peer", dialog.Peer).
Int("top_message", dialog.TopMessage).
Logger()
log.Debug().Msg("Syncing dialog")
portalKey := t.makePortalKeyFromPeer(dialog.GetPeer(), 0)
portal, err := t.main.Bridge.GetPortalByKey(ctx, portalKey)
if err != nil {
return err
}
if dialog.UnreadCount == 0 && !dialog.UnreadMark {
portal.Metadata.(*PortalMetadata).ReadUpTo = dialog.TopMessage
}
// If this is a DM, make sure that the user isn't deleted.
if user, ok := dialog.Peer.(*tg.PeerUser); ok {
if users[ids.MakeUserID(user.UserID)].(*tg.User).GetDeleted() {
log.Debug().Msg("Not syncing portal because user is deleted")
continue
}
}
var chatInfo *bridgev2.ChatInfo
switch peer := dialog.Peer.(type) {
case *tg.PeerUser:
userID := ids.MakeUserID(peer.UserID)
if users[userID].(*tg.User).GetDeleted() {
log.Debug().Int64("user_id", peer.UserID).Msg("Not syncing portal because user is deleted")
continue
}
chatInfo, err = t.getDMChatInfo(ctx, peer.UserID)
if err != nil {
return fmt.Errorf("failed to get dm info for %d: %w", peer.UserID, err)
}
case *tg.PeerChat:
chat := chats[peer.ChatID]
if chat.TypeID() == tg.ChatForbiddenTypeID {
log.Debug().
Int64("chat_id", peer.ChatID).
Msg("Not syncing portal because chat is forbidden")
continue
} else if chat.TypeID() != tg.ChatTypeID {
log.Debug().
Int64("chat_id", peer.ChatID).
Type("chat_type", chat).
Msg("Not syncing portal because chat type is unsupported")
continue
}
// Need to get full chat info to get the member list
chatInfo, err = t.GetChatInfo(ctx, portal)
if err != nil {
return fmt.Errorf("failed to get chat info for %s: %w", portalKey, err)
}
case *tg.PeerChannel:
channel := chats[peer.ChannelID]
if channel.TypeID() == tg.ChannelForbiddenTypeID {
log.Debug().
Int64("channel_id", peer.ChannelID).
Msg("Not syncing portal because channel is forbidden")
continue
} else if channel.TypeID() != tg.ChannelTypeID {
log.Debug().
Int64("channel_id", peer.ChannelID).
Type("channel_type", channel).
Msg("Not syncing portal because channel type is unsupported")
continue
}
var mfm *memberFetchMeta
chatInfo, mfm, err = t.wrapChatInfo(portal.ID, channel)
if err != nil {
return fmt.Errorf("failed to get chat info for %s: %w", portalKey, err)
}
err = t.fillChannelMembers(ctx, mfm, chatInfo.Members)
if err != nil {
log.Err(err).Msg("Failed to get channel members")
}
}
if portal == nil || portal.MXID == "" {
// Check what the latest message is
topMessage := messages[ids.MakeMessageID(dialog.Peer, dialog.TopMessage)]
if topMessage == nil {
if dialog.TopMessage == 0 {
log.Debug().Msg("Not syncing portal because there are no messages")
continue
} else {
log.Warn().Msg("TopMessage of dialog not in messages map")
}
} else if topMessage.TypeID() == tg.MessageServiceTypeID {
action := topMessage.(*tg.MessageService).Action
if action.TypeID() == tg.MessageActionContactSignUpTypeID || action.TypeID() == tg.MessageActionHistoryClearTypeID {
log.Debug().Str("action_type", action.TypeName()).Msg("Not syncing portal because it's a contact sign up or history clear")
continue
}
}
created++ // The portal will have to be created
if createLimit >= 0 && created > createLimit {
break
}
}
t.fillUserLocalMeta(chatInfo, dialog)
res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{
ChatInfo: chatInfo,
EventMeta: simplevent.EventMeta{
Type: bridgev2.RemoteEventChatResync,
LogContext: func(c zerolog.Context) zerolog.Context {
return c.Str("update", "sync")
},
PortalKey: portalKey,
CreatePortal: true,
},
CheckNeedsBackfillFunc: func(ctx context.Context, latestMessage *database.Message) (bool, error) {
if latestMessage == nil {
return true, nil
}
_, latestMessageID, err := ids.ParseMessageID(latestMessage.ID)
if err != nil {
panic(err)
}
return dialog.TopMessage > latestMessageID, nil
},
})
if err = resultToError(res); err != nil {
return err
}
// Generate a read receipt from the last known read message id
res = t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Receipt{
EventMeta: simplevent.EventMeta{
Type: bridgev2.RemoteEventReadReceipt,
PortalKey: portalKey,
Sender: t.mySender(),
},
LastTarget: ids.MakeMessageID(portalKey, dialog.ReadInboxMaxID),
ReadUpToStreamOrder: int64(dialog.ReadInboxMaxID),
})
if err = resultToError(res); err != nil {
return err
}
}
return nil
}

View File

@@ -74,7 +74,7 @@ func FloodWait(ctx context.Context, err error, opts ...FloodWaitOption) (bool, e
if d, ok := AsFloodWait(err); ok && d < opt.maxDuration {
timer := opt.clock.Timer(d + 1*time.Second)
defer clock.StopTimer(timer)
zerolog.Ctx(ctx).Debug().Dur("duration", d).Msg("Waiting on flood wait")
zerolog.Ctx(ctx).Warn().Dur("duration", d).Msg("Waiting on flood wait")
select {
case <-timer.C():
return true, err