backfill: add support for forward backfilling more than 100 messages

This commit is contained in:
Tulir Asokan
2026-04-08 00:20:29 +03:00
parent 117c5cd0ce
commit cc32d48fea

View File

@@ -153,56 +153,91 @@ func (tc *TelegramClient) FetchMessages(ctx context.Context, fetchParams bridgev
if fetchParams.Portal.Metadata.(*PortalMetadata).IsForumGeneral {
topicID = 1
}
var req bin.Object
if topicID == ids.TopicIDSpaceRoom {
return nil, nil
} else if topicID > 0 {
req = &tg.MessagesGetRepliesRequest{
Peer: peer,
MsgID: topicID,
Limit: fetchParams.Count,
MinID: minID,
OffsetID: offsetID,
}
} else {
req = &tg.MessagesGetHistoryRequest{
Peer: peer,
Limit: fetchParams.Count,
MinID: minID,
OffsetID: offsetID,
}
}
if takeoutID != 0 {
req = &tg.InvokeWithTakeoutRequest{TakeoutID: takeoutID, Query: req}
}
log.Info().Any("req", req).Msg("Fetching messages")
msgs, err := APICallWithUpdates(ctx, tc, func() (tg.ModifiedMessagesMessages, error) {
var box tg.MessagesMessagesBox
// TODO a single request can only fetch 100 messages, use multiple requests if the requested count is higher
err = tc.client.Invoke(ctx, req, &box)
if err != nil {
return nil, err
}
msgs, ok := box.Messages.(tg.ModifiedMessagesMessages)
if !ok {
return nil, fmt.Errorf("unsupported messages type %T", box.Messages)
}
return msgs, nil
})
if err != nil {
if tgerr.Is(err, tg.ErrTakeoutInvalid) {
tc.metadata.TakeoutID = 0
err := tc.userLogin.Save(ctx)
if err != nil {
log.Err(err).Msg("Failed to save user login after clearing takeout ID")
} else {
log.Debug().Msg("Cleared invalid takeout ID")
limit := fetchParams.Count
const chunkLimit = 100
makeReq := func() bin.Object {
if topicID > 0 {
return &tg.MessagesGetRepliesRequest{
Peer: peer,
MsgID: topicID,
Limit: min(limit, chunkLimit),
MinID: minID,
OffsetID: offsetID,
}
}
return &tg.MessagesGetHistoryRequest{
Peer: peer,
Limit: min(limit, chunkLimit),
MinID: minID,
OffsetID: offsetID,
}
}
var messages []tg.MessageClass
requestCount := 0
for limit > 0 {
requestCount++
req := makeReq()
if takeoutID != 0 {
req = &tg.InvokeWithTakeoutRequest{TakeoutID: takeoutID, Query: req}
}
log.Info().Any("req", req).Msg("Fetching messages")
resp, err := APICallWithUpdates(ctx, tc, func() (tg.ModifiedMessagesMessages, error) {
var box tg.MessagesMessagesBox
retry := true
attempts := 0
var err error
for retry && attempts < 5 {
retry, err = tgerr.FloodWait(ctx, tc.client.Invoke(ctx, req, &box))
attempts++
}
if err != nil {
return nil, err
}
msgs, ok := box.Messages.(tg.ModifiedMessagesMessages)
if !ok {
return nil, fmt.Errorf("unsupported messages type %T", box.Messages)
}
return msgs, nil
})
if err != nil {
if tgerr.Is(err, tg.ErrTakeoutInvalid) {
tc.metadata.TakeoutID = 0
err := tc.userLogin.Save(ctx)
if err != nil {
log.Err(err).Msg("Failed to save user login after clearing takeout ID")
} else {
log.Debug().Msg("Cleared invalid takeout ID")
}
}
return nil, err
}
newMessages := resp.GetMessages()
if messages == nil {
messages = newMessages
} else {
messages = append(messages, resp.GetMessages()...)
}
if len(newMessages) < chunkLimit || !fetchParams.Forward {
break
}
limit -= len(newMessages)
offsetID = newMessages[len(newMessages)-1].GetID()
if takeoutID == 0 {
waitTime := time.Duration(min(requestCount*2, 15)) * time.Second
log.Debug().
Dur("wait_time", waitTime).
Msg("Not using takeout, waiting before requesting another batch of messages")
select {
case <-time.After(waitTime):
case <-ctx.Done():
return nil, ctx.Err()
}
}
return nil, err
}
messages := msgs.GetMessages()
portal := fetchParams.Portal
// If the first message is the last read message, mark the chat as read