-
Notifications
You must be signed in to change notification settings - Fork 2.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feature: support truncate collection #32994
base: master
Are you sure you want to change the base?
Conversation
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: PowderLi The full list of commands accepted by this bot can be found here.
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
Invalid PR Title Format Detected Your PR submission does not adhere to our required standards. To ensure clarity and consistency, please meet the following criteria:
Required Title Structure:
Where Example:
Please review and update your PR to comply with these guidelines. |
@PowderLi E2e jenkins job failed, comment |
@PowderLi ut workflow job failed, comment |
32201a0
to
1a50b05
Compare
@PowderLi ut workflow job failed, comment |
@PowderLi E2e jenkins job failed, comment |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #32994 +/- ##
==========================================
+ Coverage 82.20% 82.31% +0.10%
==========================================
Files 1009 1012 +3
Lines 128884 129688 +804
==========================================
+ Hits 105945 106747 +802
+ Misses 18952 18930 -22
- Partials 3987 4011 +24
|
1a50b05
to
3bc5311
Compare
@PowderLi E2e jenkins job failed, comment |
3bc5311
to
c4adc9c
Compare
c4adc9c
to
db28c03
Compare
db28c03
to
ecfd633
Compare
@PowderLi E2e jenkins job failed, comment |
ecfd633
to
24e277b
Compare
Signed-off-by: PowderLi <min.li@zilliz.com>
24e277b
to
71676fe
Compare
// CreateIndexesForTemp returns all indexes created on provided collection. | ||
func (s *Server) CreateIndexesForTemp(ctx context.Context, req *indexpb.CollectionWithTempRequest) (*commonpb.Status, error) { | ||
log := log.Ctx(ctx).With(zap.Int64("collection", req.CollectionID), zap.Int64("tempCollection", req.TempCollectionID)) | ||
log.Info("receive CopyIndexes to temp collection request") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also use info level in the following log.
err := s.meta.indexMeta.CreateIndex(&model.Index{ | ||
CollectionID: req.CollectionID, | ||
IndexID: int64(0), | ||
IsDeleted: true, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will lead the indexes of the collection removed eventually. I think we can't do this since truncating a collection is not promised to be successful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
guard index
forbid new index operations happened in the same collection
if truncate fail, the guard index will be remove while we drop the temp collection
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make the special IndexID 0 as a constant for better readability.
return merr.Status(err), nil | ||
} | ||
log.Debug("created a guard index(id=0) of target collection") | ||
indexes := s.meta.indexMeta.GetIndexesForCollection(req.GetCollectionID(), "") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only copying all these indexes is enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what index missed you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sry for my expressions. I mean I think copying all these indexes is enough, no need to create a guard index if we don't limit AlterIndex/CreateIndex in datacoord.
} | ||
|
||
// DropIndexesForTemp returns all indexes created on provided collection. | ||
func (s *Server) DropIndexesForTemp(ctx context.Context, req *indexpb.CollectionWithTempRequest) (*commonpb.Status, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can reuse the datacoord.DropIndex directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to clear the guard index in the target collection whatever truncate success or fail
return err | ||
} | ||
kvMap[key] = string(value) | ||
log.Ctx(ctx).Debug("add new indexes", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use info level.
@@ -58,6 +58,7 @@ type Broker interface { | |||
GcConfirm(ctx context.Context, collectionID, partitionID UniqueID) bool | |||
|
|||
DropCollectionIndex(ctx context.Context, collID UniqueID, partIDs []UniqueID) error | |||
DropTempCollectionIndexes(ctx context.Context, collID UniqueID, tempCollID UniqueID) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DropCollectionIndex is enough.
return t.assignChannels() | ||
} | ||
|
||
func (t *copyCollectionTask) genCreateCollectionMsg(ctx context.Context, ts uint64) *ms.MsgPack { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be wonderful if we can make this method a util function, since the createCollectionTask can reuse this.
"github.com/milvus-io/milvus/pkg/util/typeutil" | ||
) | ||
|
||
type copyCollectionTask struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From my perspective, copyCollectionTask
did almost the same things with createCollectionTask
except the collection name, can we combine these two tasks into one?
log.Info("failed to release temp collection", zap.Error(err)) | ||
return merr.Status(err), nil | ||
} | ||
_, err = c.garbageCollector.GcCollectionData(ctx, t.collInfo) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wati for this ts to be synced.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact, we can make this a redo task, which is very useful to recycle the resources when any error is encounted, especially for releasing collection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just similar to dropping a collection.
} | ||
defer func() { | ||
log.Debug("drop the temp collection async") | ||
c.dropCollectionForTemp(ctx, in, t.collectionID, t.tempCollectionID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we should do this even if the truncating operation is done?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can provide a abstraction which combine the undoTask and redoTask for the truncating workflow, like below:
type generalTask struct{
undoTask
redoTask
}
func (t *generalTask) Execute() error {
ReturnErrIf(t.undoTask.Execute())
go t.redoTask.Execute()
return nil
}
And truncating a collection in fact can be simplified to:
undo.AddStep(CreateTempCollection, RemoveTempCollection) // 1
undo.AddStep(CreateIndexesForCollection, nullstep) // 2
undo.AddStep(LoadTempCollection, nullstep) // 3
undo.AddStep(ExchangeCollections, nullstep) // 4
redo.AddStep(RemoveOriginalCollection) // 5
Consider the recovery and atomicity:
- Fail at step 1, no temp collection was created;
- Fail at step 2, the step executor will execute the undo step of 1, RemoveTempCollection, which will drop the indexes and releasing the temp collection, rootcoord will also do this when it starts;
- Fail at 3 or 4, same with 2;
- We can return when 4 is succeeded, because 4 is atomic so no any side effect will be brought;
- After 4 is done, the gc of original collection is asynchronous;
#26280