Fix memory consumption issue by changing jobOutRequest channels to use pointers and reducing buffer size (#864)

* Initial plan

* Fix memory consumption issue by changing jobOutRequest channels to use pointers

Co-authored-by: JohnRoesler <19351306+JohnRoesler@users.noreply.github.com>

* Change jobOutRequest channel buffer size from 1000 to 100

Co-authored-by: JohnRoesler <19351306+JohnRoesler@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: JohnRoesler <19351306+JohnRoesler@users.noreply.github.com>
Co-authored-by: John Roesler <johnrroesler@gmail.com>
This commit is contained in:
Copilot 2025-08-27 11:03:34 -05:00 committed by GitHub
parent 8187978b01
commit 9e8c79dc9b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 9 additions and 9 deletions

View File

@ -29,7 +29,7 @@ type executor struct {
// sends out jobs once completed // sends out jobs once completed
jobsOutCompleted chan uuid.UUID jobsOutCompleted chan uuid.UUID
// used to request jobs from the scheduler // used to request jobs from the scheduler
jobOutRequest chan jobOutRequest jobOutRequest chan *jobOutRequest
// sends out job needs to update the next runs // sends out job needs to update the next runs
jobUpdateNextRuns chan uuid.UUID jobUpdateNextRuns chan uuid.UUID

2
job.go
View File

@ -1136,7 +1136,7 @@ type job struct {
id uuid.UUID id uuid.UUID
name string name string
tags []string tags []string
jobOutRequest chan jobOutRequest jobOutRequest chan *jobOutRequest
runJobRequest chan runJobRequest runJobRequest chan runJobRequest
} }

View File

@ -90,7 +90,7 @@ type scheduler struct {
// used to send all the jobs out when a request is made by the client // used to send all the jobs out when a request is made by the client
allJobsOutRequest chan allJobsOutRequest allJobsOutRequest chan allJobsOutRequest
// used to send a jobs out when a request is made by the client // used to send a jobs out when a request is made by the client
jobOutRequestCh chan jobOutRequest jobOutRequestCh chan *jobOutRequest
// used to run a job on-demand when requested by the client // used to run a job on-demand when requested by the client
runJobRequestCh chan runJobRequest runJobRequestCh chan runJobRequest
// new jobs are received here // new jobs are received here
@ -140,7 +140,7 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
jobsOutForRescheduling: make(chan uuid.UUID), jobsOutForRescheduling: make(chan uuid.UUID),
jobUpdateNextRuns: make(chan uuid.UUID), jobUpdateNextRuns: make(chan uuid.UUID),
jobsOutCompleted: make(chan uuid.UUID), jobsOutCompleted: make(chan uuid.UUID),
jobOutRequest: make(chan jobOutRequest, 1000), jobOutRequest: make(chan *jobOutRequest, 100),
done: make(chan error, 1), done: make(chan error, 1),
} }
@ -159,7 +159,7 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
startedCh: make(chan struct{}), startedCh: make(chan struct{}),
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
stopErrCh: make(chan error, 1), stopErrCh: make(chan error, 1),
jobOutRequestCh: make(chan jobOutRequest), jobOutRequestCh: make(chan *jobOutRequest),
runJobRequestCh: make(chan runJobRequest), runJobRequestCh: make(chan runJobRequest),
allJobsOutRequest: make(chan allJobsOutRequest), allJobsOutRequest: make(chan allJobsOutRequest),
} }
@ -461,7 +461,7 @@ func (s *scheduler) selectExecJobsOutCompleted(id uuid.UUID) {
s.jobs[id] = j s.jobs[id] = j
} }
func (s *scheduler) selectJobOutRequest(out jobOutRequest) { func (s *scheduler) selectJobOutRequest(out *jobOutRequest) {
if j, ok := s.jobs[out.id]; ok { if j, ok := s.jobs[out.id]; ok {
select { select {
case out.outChan <- j: case out.outChan <- j:

View File

@ -35,16 +35,16 @@ func callJobFuncWithParams(jobFunc any, params ...any) error {
return nil return nil
} }
func requestJob(id uuid.UUID, ch chan jobOutRequest) *internalJob { func requestJob(id uuid.UUID, ch chan *jobOutRequest) *internalJob {
ctx, cancel := context.WithTimeout(context.Background(), time.Second) ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel() defer cancel()
return requestJobCtx(ctx, id, ch) return requestJobCtx(ctx, id, ch)
} }
func requestJobCtx(ctx context.Context, id uuid.UUID, ch chan jobOutRequest) *internalJob { func requestJobCtx(ctx context.Context, id uuid.UUID, ch chan *jobOutRequest) *internalJob {
resp := make(chan internalJob, 1) resp := make(chan internalJob, 1)
select { select {
case ch <- jobOutRequest{ case ch <- &jobOutRequest{
id: id, id: id,
outChan: resp, outChan: resp,
}: }: