Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: optimize route switching when freeze segment #321

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft

Conversation

hwjiangkai
Copy link
Contributor

What problem does this PR solve?

Optimize the routing logic for segment full

Issue Number: close #xxx

Problem Summary

What is changed and how does it work?

Check List

Tests

  • Unit test
  • Integration test
  • Manual test (add detailed scripts or steps below)
  • No code

@codecov
Copy link

codecov bot commented Dec 5, 2022

Codecov Report

Merging #321 (8c06b70) into main (09f3748) will decrease coverage by 3.64%.
The diff coverage is 54.38%.

Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main     #321      +/-   ##
==========================================
- Coverage   62.72%   59.08%   -3.64%     
==========================================
  Files         133      175      +42     
  Lines       12128    14474    +2346     
==========================================
+ Hits         7607     8552     +945     
- Misses       3977     5306    +1329     
- Partials      544      616      +72     
Impacted Files Coverage Δ
internal/controller/eventbus/server/instance.go 72.04% <ø> (ø)
internal/controller/trigger/storage/secret.go 48.48% <ø> (ø)
...nternal/controller/trigger/storage/subscription.go 54.71% <ø> (ø)
...ernal/controller/trigger/storage/trigger_worker.go 63.41% <ø> (ø)
...rnal/primitive/interceptor/errinterceptor/error.go 0.00% <0.00%> (ø)
.../primitive/interceptor/memberinterceptor/member.go 0.00% <0.00%> (ø)
internal/primitive/vanus/id.go 19.68% <0.00%> (-0.65%) ⬇️
internal/raft/log/recovery.go 52.27% <ø> (ø)
internal/raft/log/wal.go 100.00% <ø> (ø)
internal/store/block/entry.go 70.96% <ø> (ø)
... and 159 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 1157347...8c06b70. Read the comment docs.

internal/store/block/raw.go Outdated Show resolved Hide resolved
internal/store/block/raw.go Outdated Show resolved Hide resolved
internal/store/block/raft/appender.go Show resolved Hide resolved
@hwjiangkai hwjiangkai force-pushed the segment branch 2 times, most recently from b4563e9 to 8b132b7 Compare December 9, 2022 08:07
internal/store/block/raft/appender.go Show resolved Hide resolved
internal/store/segment/replica.go Outdated Show resolved Hide resolved
internal/store/vsb/block_open_test.go Show resolved Hide resolved
internal/store/vsb/block_append_test.go Show resolved Hide resolved
@@ -106,14 +106,15 @@ func (b *vsBlock) Delete(context.Context) error {
}

func (b *vsBlock) status() block.Statistics {
return b.stat(b.makeSnapshot())
m, indexes := b.makeSnapshot()
return b.stat(m, indexes, block.StateWorking)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pass state with m.archived.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

m.archived has been deleted, and the segment status cannot be rolled back

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

meta.archived

client/pkg/eventlog/eventlog_impl.go Show resolved Hide resolved
client/pkg/eventlog/eventlog_impl.go Show resolved Hide resolved
client/pkg/eventlog/eventlog_impl.go Outdated Show resolved Hide resolved
client/pkg/eventlog/eventlog_impl.go Show resolved Hide resolved
client/pkg/eventlog/eventlog_impl.go Outdated Show resolved Hide resolved
Signed-off-by: jyjiangkai <jyjiangkai@163.com>
Signed-off-by: jyjiangkai <jyjiangkai@163.com>
client/pkg/eventbus/eventbus.go Outdated Show resolved Hide resolved
client/pkg/eventbus/eventbus.go Outdated Show resolved Hide resolved
client/pkg/eventbus/eventbus.go Outdated Show resolved Hide resolved
client/pkg/eventbus/eventbus.go Outdated Show resolved Hide resolved
Comment on lines 145 to 146
if l.logWriter != nil {
return l.logWriter
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe access l.logWriter is not safe.

@@ -297,7 +288,6 @@ func TestVSBlock_Append(t *testing.T) {
So(archived, ShouldBeTrue)

stat = b.status()
So(stat.Archived, ShouldBeTrue)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check state

@@ -104,7 +105,6 @@ func TestVSBlock_Open(t *testing.T) {

stat := b.status()
So(stat.Capacity, ShouldEqual, vsbtest.EntrySize0+vsbtest.EntrySize1)
So(stat.Archived, ShouldBeTrue)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check state

@@ -141,7 +141,6 @@ func TestVSBlock_Open(t *testing.T) {

stat := b.status()
So(stat.Capacity, ShouldEqual, vsbtest.EntrySize0+vsbtest.EntrySize1)
So(stat.Archived, ShouldBeFalse)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check state

internal/trigger/server_test.go Show resolved Hide resolved
StateArchived = State("archived")
)

func (s State) ToSegmentState() string {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't place this at here.


segmentNum := len(l.readableSegments)
n := sort.Search(segmentNum, func(i int) bool {
return l.readableSegments[uint64(i)].EndOffset() > offset
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the key of readableSegments is not a index

Signed-off-by: jyjiangkai <jyjiangkai@163.com>
Signed-off-by: jyjiangkai <jyjiangkai@163.com>
return seqs, frag, actx.size(b.dataOffset) >= b.capacity, nil
full := actx.size(b.dataOffset) >= b.capacity
if full && b.lis != nil {
m, indexes := makeSnapshot(b.actx, b.indexes)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't use b.actx and b.indexes in PrepareAppend()


s.replicas.Range(func(key, value interface{}) bool {
b, _ := value.(replica)
if b.appender.Status().Leader.Equals(vanus.ID(segment.LeaderBlockId)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use LeaderBlockId is unsafe, because it may be not equal to info.leader from info.term.

state := block.StateWorking
m, indexes := b.makeSnapshot()
if m.archived {
state = block.StateArchiving
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StateArchiving does not match the semantics of archived.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And the value of state needs to persist to disk.

@@ -216,6 +219,8 @@ func (b *eventbus) ListLog(ctx context.Context, opts ...api.LogOption) ([]api.Ev
}

if op.Policy.AccessMode() == api.ReadOnly {
b.readableMu.RLock()
defer b.readableMu.RUnlock()
if len(b.readableLogs) == 0 {
b.refreshReadableLogs(ctx)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unlock when refresh route info.

@@ -225,6 +230,8 @@ func (b *eventbus) ListLog(ctx context.Context, opts ...api.LogOption) ([]api.Ev
}
return eventlogs, nil
} else if op.Policy.AccessMode() == api.ReadWrite {
b.writableMu.RLock()
defer b.writableMu.RUnlock()
if len(b.writableLogs) == 0 {
b.refreshWritableLogs(ctx)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unlock when refresh route info

Comment on lines 132 to 133
l.logWriter = nil
l.logReader = nil
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do not call Close()?

@@ -185,19 +184,23 @@ func (b *eventbus) GetLog(ctx context.Context, logID uint64, opts ...api.LogOpti
}

if op.Policy.AccessMode() == api.ReadOnly {
b.readableMu.RLock()
defer b.readableMu.RUnlock()
if len(b.readableLogs) == 0 {
b.refreshReadableLogs(ctx)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unlock when refresh route info.

}
return nil, errors.ErrResourceNotFound.WithMessage("eventlog not found")
} else if op.Policy.AccessMode() == api.ReadWrite {
b.writableMu.RLock()
defer b.writableMu.RUnlock()
if len(b.writableLogs) == 0 {
b.refreshWritableLogs(ctx)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unlock when refresh route info.

// LastEntryStime
// time.UnixMilli
tailSeg := fetchTailSegment(ctx, segs)
if tailSeg.firstEventBornAt.Before(t) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use firstEventBornAt is unreasonable.

@hwjiangkai hwjiangkai force-pushed the segment branch 2 times, most recently from af4e196 to f4afed5 Compare February 6, 2023 12:52
Signed-off-by: jyjiangkai <jyjiangkai@163.com>
@ifplusor ifplusor changed the title feat: Optimize the routing logic for segment full feat: optimize route switching when freeze segment Feb 10, 2023
@ifplusor ifplusor self-assigned this Feb 14, 2023
@ifplusor ifplusor marked this pull request as draft February 25, 2023 11:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants