fix: dispatch pending events after user creation commits
The register handler, local/LDAP login and the OIDC callback all queue the user.created event via DispatchOnCommit but never called DispatchPending, so the event was silently dropped and its queue entry leaked. Flush after commit and discard on rollback.
This commit is contained in:
parent
9da51f5096
commit
b86710903b
|
|
@ -27,6 +27,7 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"code.vikunja.io/api/pkg/db"
|
"code.vikunja.io/api/pkg/db"
|
||||||
|
"code.vikunja.io/api/pkg/events"
|
||||||
"code.vikunja.io/api/pkg/log"
|
"code.vikunja.io/api/pkg/log"
|
||||||
"code.vikunja.io/api/pkg/models"
|
"code.vikunja.io/api/pkg/models"
|
||||||
"code.vikunja.io/api/pkg/modules/auth"
|
"code.vikunja.io/api/pkg/modules/auth"
|
||||||
|
|
@ -187,6 +188,9 @@ func HandleCallback(c *echo.Context) error {
|
||||||
|
|
||||||
s := db.NewSession()
|
s := db.NewSession()
|
||||||
defer s.Close()
|
defer s.Close()
|
||||||
|
// Discards events queued during a rolled-back transaction (e.g. user
|
||||||
|
// creation); a no-op once DispatchPending has run.
|
||||||
|
defer events.CleanupPending(s)
|
||||||
|
|
||||||
// Check if we have seen this user before
|
// Check if we have seen this user before
|
||||||
u, err := getOrCreateUser(s, cl, provider, idToken)
|
u, err := getOrCreateUser(s, cl, provider, idToken)
|
||||||
|
|
@ -212,6 +216,9 @@ func HandleCallback(c *echo.Context) error {
|
||||||
if err := enforceTOTPIfRequired(s, u, cb.TOTPPasscode); err != nil {
|
if err := enforceTOTPIfRequired(s, u, cb.TOTPPasscode); err != nil {
|
||||||
if commitErr := s.Commit(); commitErr != nil {
|
if commitErr := s.Commit(); commitErr != nil {
|
||||||
log.Errorf("Error committing session after failed OIDC TOTP attempt for user %d: %v", u.ID, commitErr)
|
log.Errorf("Error committing session after failed OIDC TOTP attempt for user %d: %v", u.ID, commitErr)
|
||||||
|
} else {
|
||||||
|
// The user creation above was committed, so its events are real.
|
||||||
|
events.DispatchPending(c.Request().Context(), s)
|
||||||
}
|
}
|
||||||
if user.IsErrInvalidTOTPPasscode(err) {
|
if user.IsErrInvalidTOTPPasscode(err) {
|
||||||
user.HandleFailedTOTPAuth(u)
|
user.HandleFailedTOTPAuth(u)
|
||||||
|
|
@ -233,6 +240,8 @@ func HandleCallback(c *echo.Context) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
events.DispatchPending(c.Request().Context(), s)
|
||||||
|
|
||||||
// Create token
|
// Create token
|
||||||
return auth.NewUserAuthTokenResponse(u, c, false)
|
return auth.NewUserAuthTokenResponse(u, c, false)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -17,8 +17,11 @@
|
||||||
package shared
|
package shared
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
"code.vikunja.io/api/pkg/config"
|
"code.vikunja.io/api/pkg/config"
|
||||||
"code.vikunja.io/api/pkg/db"
|
"code.vikunja.io/api/pkg/db"
|
||||||
|
"code.vikunja.io/api/pkg/events"
|
||||||
"code.vikunja.io/api/pkg/log"
|
"code.vikunja.io/api/pkg/log"
|
||||||
"code.vikunja.io/api/pkg/metrics"
|
"code.vikunja.io/api/pkg/metrics"
|
||||||
"code.vikunja.io/api/pkg/models"
|
"code.vikunja.io/api/pkg/models"
|
||||||
|
|
@ -39,9 +42,12 @@ type UserRegister struct {
|
||||||
// busts the cached user-count metric so the registration shows up immediately.
|
// busts the cached user-count metric so the registration shows up immediately.
|
||||||
// The caller is responsible for the registration-enabled gate and input
|
// The caller is responsible for the registration-enabled gate and input
|
||||||
// validation; both v1 and v2 share this body.
|
// validation; both v1 and v2 share this body.
|
||||||
func RegisterUser(in *UserRegister) (*user.User, error) {
|
func RegisterUser(ctx context.Context, in *UserRegister) (*user.User, error) {
|
||||||
s := db.NewSession()
|
s := db.NewSession()
|
||||||
defer s.Close()
|
defer s.Close()
|
||||||
|
// Discards events queued during a rolled-back transaction; a no-op once
|
||||||
|
// DispatchPending has run.
|
||||||
|
defer events.CleanupPending(s)
|
||||||
|
|
||||||
newUser, err := models.RegisterUser(s, &user.User{
|
newUser, err := models.RegisterUser(s, &user.User{
|
||||||
Username: in.Username,
|
Username: in.Username,
|
||||||
|
|
@ -59,6 +65,8 @@ func RegisterUser(in *UserRegister) (*user.User, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
events.DispatchPending(ctx, s)
|
||||||
|
|
||||||
// Bust the cached user count so the new registration shows up in metrics
|
// Bust the cached user count so the new registration shows up in metrics
|
||||||
// immediately instead of after the regular cache expiry.
|
// immediately instead of after the regular cache expiry.
|
||||||
if config.MetricsEnabled.GetBool() {
|
if config.MetricsEnabled.GetBool() {
|
||||||
|
|
|
||||||
|
|
@ -53,6 +53,9 @@ func Login(c *echo.Context) (err error) {
|
||||||
|
|
||||||
s := db.NewSession()
|
s := db.NewSession()
|
||||||
defer s.Close()
|
defer s.Close()
|
||||||
|
// Discards events queued during a rolled-back transaction (e.g. LDAP user
|
||||||
|
// creation); a no-op once DispatchPending has run.
|
||||||
|
defer events.CleanupPending(s)
|
||||||
|
|
||||||
var user *user2.User
|
var user *user2.User
|
||||||
if config.AuthLdapEnabled.GetBool() {
|
if config.AuthLdapEnabled.GetBool() {
|
||||||
|
|
@ -127,6 +130,8 @@ func Login(c *echo.Context) (err error) {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
events.DispatchPending(c.Request().Context(), s)
|
||||||
|
|
||||||
// Create token
|
// Create token
|
||||||
return auth.NewUserAuthTokenResponse(user, c, u.LongToken)
|
return auth.NewUserAuthTokenResponse(user, c, u.LongToken)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -63,7 +63,7 @@ func RegisterUser(c *echo.Context) error {
|
||||||
return c.JSON(http.StatusBadRequest, models.Message{Message: "No or invalid user model provided."})
|
return c.JSON(http.StatusBadRequest, models.Message{Message: "No or invalid user model provided."})
|
||||||
}
|
}
|
||||||
|
|
||||||
newUser, err := shared.RegisterUser(userIn)
|
newUser, err := shared.RegisterUser(c.Request().Context(), userIn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -127,8 +127,8 @@ func registerLocalAuthRoutes(api huma.API) {
|
||||||
}, authConfirmEmail)
|
}, authConfirmEmail)
|
||||||
}
|
}
|
||||||
|
|
||||||
func authRegister(_ context.Context, in *struct{ Body shared.UserRegister }) (*registerUserBody, error) {
|
func authRegister(ctx context.Context, in *struct{ Body shared.UserRegister }) (*registerUserBody, error) {
|
||||||
newUser, err := shared.RegisterUser(&in.Body)
|
newUser, err := shared.RegisterUser(ctx, &in.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, translateDomainError(err)
|
return nil, translateDomainError(err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue