mirror of
https://github.com/mautrix/telegram.git
synced 2026-05-17 07:25:46 +03:00
client: don't drop errors from client.Run()
This commit is contained in:
@@ -81,7 +81,7 @@ type TelegramClient struct {
|
||||
updatesManager *updates.Manager
|
||||
clientCtx context.Context
|
||||
clientCancel context.CancelFunc
|
||||
clientDone *Future[error]
|
||||
clientDone *exsync.Event
|
||||
clientInitialized *exsync.Event
|
||||
mu sync.Mutex
|
||||
|
||||
@@ -166,6 +166,7 @@ func NewTelegramClient(ctx context.Context, tc *TelegramConnector, login *bridge
|
||||
prevReactionPoll: map[networkid.PortalKey]time.Time{},
|
||||
|
||||
clientInitialized: exsync.NewEvent(),
|
||||
clientDone: exsync.NewEvent(),
|
||||
}
|
||||
|
||||
if !login.Metadata.(*UserLoginMetadata).Session.HasAuthKey() {
|
||||
@@ -411,6 +412,9 @@ func (t *TelegramClient) sendBadCredentialsOrUnknownError(err error) {
|
||||
StateEvent: status.StateUnknownError,
|
||||
Error: "tg-unknown-error",
|
||||
Message: humanise.Error(err),
|
||||
Info: map[string]any{
|
||||
"go_error": err.Error(),
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -508,15 +512,11 @@ func (t *TelegramClient) onAuthError(err error) {
|
||||
}()
|
||||
}
|
||||
|
||||
func (t *TelegramClient) Connect(_ context.Context) {
|
||||
func (t *TelegramClient) Connect(ctx context.Context) {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
log := zerolog.Ctx(context.Background()).With().Int64("user_id", t.telegramUserID).Logger()
|
||||
ctx = log.WithContext(ctx)
|
||||
|
||||
log := zerolog.Ctx(ctx)
|
||||
if !t.metadata.Session.HasAuthKey() {
|
||||
log.Warn().Msg("user does not have an auth key, sending bad credentials state")
|
||||
t.sendBadCredentialsOrUnknownError(ErrNoAuthKey)
|
||||
@@ -532,39 +532,38 @@ func (t *TelegramClient) Connect(_ context.Context) {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
t.clientCtx = ctx
|
||||
t.clientCancel = cancel
|
||||
t.clientDone = NewFuture[error]()
|
||||
t.clientDone.Clear()
|
||||
t.clientInitialized.Clear()
|
||||
go t.runInBackground(ctx)
|
||||
}
|
||||
|
||||
runTelegramClient(ctx, t.client, t.clientInitialized, t.clientDone, func(ctx context.Context) error {
|
||||
func (t *TelegramClient) runInBackground(ctx context.Context) {
|
||||
log := zerolog.Ctx(ctx)
|
||||
err := t.client.Run(ctx, func(ctx context.Context) error {
|
||||
t.clientInitialized.Set()
|
||||
log.Info().Msg("Client running starting updates")
|
||||
return t.updatesManager.Run(ctx, t.client.API(), t.telegramUserID, updates.AuthOptions{
|
||||
IsBot: t.metadata.IsBot,
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func runTelegramClient(ctx context.Context, client *telegram.Client, initialized *exsync.Event, done *Future[error], callback func(ctx context.Context) error) {
|
||||
go func() {
|
||||
err := client.Run(ctx, func(ctx context.Context) error {
|
||||
initialized.Set()
|
||||
return callback(ctx)
|
||||
})
|
||||
initialized.Set()
|
||||
done.Set(err)
|
||||
}()
|
||||
t.clientDone.Set()
|
||||
t.clientInitialized.Set()
|
||||
if err != nil {
|
||||
log.Err(err).Msg("Client exited with error")
|
||||
t.sendBadCredentialsOrUnknownError(err)
|
||||
}
|
||||
}
|
||||
|
||||
func (t *TelegramClient) Disconnect() {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
|
||||
t.userLogin.Log.Info().Msg("Disconnecting client")
|
||||
t.userLogin.Log.Debug().Msg("Disconnecting client")
|
||||
|
||||
if t.clientCancel != nil {
|
||||
t.clientCancel()
|
||||
t.userLogin.Log.Info().Msg("Waiting for client")
|
||||
err, _ := t.clientDone.Get(context.Background())
|
||||
t.userLogin.Log.Info().Err(err).Msg("Client done")
|
||||
t.userLogin.Log.Debug().Msg("Waiting for client disconnection")
|
||||
<-t.clientDone.GetChan()
|
||||
}
|
||||
|
||||
t.userLogin.Log.Info().Msg("Disconnect complete")
|
||||
@@ -614,7 +613,8 @@ func (t *TelegramClient) getSingleChannel(ctx context.Context, id int64) (*tg.Ch
|
||||
|
||||
func (t *TelegramClient) IsLoggedIn() bool {
|
||||
// TODO use less hacky check than context cancellation
|
||||
return t != nil && t.clientCtx != nil && t.client != nil && t.clientCtx.Err() == nil &&
|
||||
return t != nil && t.client != nil &&
|
||||
t.clientInitialized.IsSet() && !t.clientDone.IsSet() &&
|
||||
t.metadata.Session.HasAuthKey()
|
||||
}
|
||||
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
package connector
|
||||
|
||||
import (
|
||||
"cmp"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
@@ -25,7 +26,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog"
|
||||
"go.mau.fi/util/exsync"
|
||||
"go.mau.fi/zerozap"
|
||||
"go.uber.org/zap"
|
||||
"maunium.net/go/mautrix/bridgev2"
|
||||
@@ -136,13 +136,22 @@ func (bl *baseLogin) makeClient(ctx context.Context, dispatcher *tg.UpdateDispat
|
||||
})
|
||||
|
||||
bl.ctx, bl.cancel = context.WithTimeoutCause(log.WithContext(bl.main.Bridge.BackgroundCtx), LoginTimeout, ErrLoginTimeout)
|
||||
initialized := exsync.NewEvent()
|
||||
done := NewFuture[error]()
|
||||
runTelegramClient(bl.ctx, bl.client, initialized, done, waitContextDone)
|
||||
connectResult := NewFuture[error]()
|
||||
go func() {
|
||||
err := bl.client.Run(bl.ctx, func(ctx context.Context) error {
|
||||
connectResult.Set(nil)
|
||||
<-ctx.Done()
|
||||
return ctx.Err()
|
||||
})
|
||||
connectResult.Set(err)
|
||||
if err != nil && !errors.Is(err, bl.ctx.Err()) {
|
||||
log.Err(err).Msg("Login client exited with error")
|
||||
}
|
||||
}()
|
||||
|
||||
log.Debug().Msg("Waiting for client to connect")
|
||||
err := initialized.Wait(ctx)
|
||||
if err != nil {
|
||||
connErr, ctxErr := connectResult.Get(ctx)
|
||||
if err := cmp.Or(connErr, ctxErr); err != nil {
|
||||
bl.Cancel()
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -139,8 +139,8 @@ func (c *Client) Run(ctx context.Context, f func(ctx context.Context) error) (er
|
||||
// handling or pool creation.
|
||||
c.ctx, c.cancel = context.WithCancel(ctx)
|
||||
|
||||
c.log.Info("Starting")
|
||||
defer c.log.Info("Closed")
|
||||
c.log.Info("Client starting")
|
||||
defer c.log.Info("Client closed")
|
||||
// Cancel client on exit.
|
||||
defer c.cancel()
|
||||
defer func() {
|
||||
|
||||
Reference in New Issue
Block a user